Source code for climada.engine.forecast

"""
This file is part of CLIMADA.

Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.

CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.

CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.  See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.

---

Define Forecast
"""

__all__ = ["Forecast"]


import logging
import datetime as dt
from typing import Dict, Optional
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
from matplotlib.ticker import PercentFormatter, ScalarFormatter
from matplotlib.colors import ListedColormap, BoundaryNorm
import cartopy.crs as ccrs
from matplotlib import colormaps as cm
import pyproj
import shapely
from cartopy.io import shapereader
from mpl_toolkits.axes_grid1 import make_axes_locatable

from climada.hazard import Hazard
from climada.entity import Exposures
from climada.entity.impact_funcs import ImpactFuncSet
from climada.engine import ImpactCalc
import climada.util.plot as u_plot
from climada.util.config import CONFIG
from climada.util.files_handler import to_list
import climada.util.coordinates as u_coord
from climada.util.value_representation import (
    value_to_monetary_unit as u_value_to_monetary_unit,
)

LOGGER = logging.getLogger(__name__)

DATA_DIR = CONFIG.local_data.save_dir.str()

FORECAST_DIR = CONFIG.engine.forecast.local_data.str()

FORECAST_PLOT_DIR = CONFIG.engine.forecast.plot_dir.dir()

# defining colormaps
# The colors are in line the european meteoalarm colors http://www.meteoalarm.info/
# and with the colors used at MeteoSwiss
# https://www.natural-hazards.ch/home/dealing-with-natural-hazards/explanation-of-the-danger-levels.html
# colors for warning levels
COLORS_WARN = np.array(
    [
        [204 / 255, 255 / 255, 102 / 255, 1],  # green
        [255 / 255, 255 / 255, 0 / 255, 1],  # yellow
        [255 / 255, 153 / 255, 0 / 255, 1],  # orange
        [255 / 255, 0 / 255, 0 / 255, 1],  # red
        [128 / 255, 0 / 255, 0 / 255, 1],  # dark red
    ]
)
# colors for warning probabilities
warnprob_colors = np.array(
    [
        [255 / 255, 255 / 255, 255 / 255, 1],  # white
        [215 / 255, 227 / 255, 238 / 255, 1],  # lightest blue
        [181 / 255, 202 / 255, 255 / 255, 1],  # ligthblue
        [127 / 255, 151 / 255, 255 / 255, 1],  # blue
        [171 / 255, 207 / 255, 99 / 255, 1],  # green
        [232 / 255, 245 / 255, 158 / 255, 1],  # light yellow?
        [255 / 255, 250 / 255, 20 / 255, 1],  # yellow
        [255 / 255, 209 / 255, 33 / 255, 1],  # light orange
        [255 / 255, 163 / 255, 10 / 255, 1],  # orange
        [255 / 255, 76 / 255, 0 / 255, 1],  # red
    ]
)
warnprob_colors_extended = np.repeat(warnprob_colors, 10, axis=0)
CMAP_WARNPROB = ListedColormap(warnprob_colors_extended)
# colors for impact forecast
color_map_pre = cm.get_cmap("plasma").resampled(90)
impact_colors = color_map_pre(np.linspace(0, 1, 90))
white_extended = np.repeat([[255 / 255, 255 / 255, 255 / 255, 1]], 10, axis=0)
impact_colors_extended = np.append(white_extended, impact_colors, axis=0)
CMAP_IMPACT = ListedColormap(impact_colors_extended)


[docs] class Forecast: """Forecast definition. Compute an impact forecast with predefined hazard originating from a forecast (like numerical weather prediction models), exposure and impact. Use the calc() method to calculate a forecasted impact. Then use the plotting methods to illustrate the forecasted impacts. By default plots are saved under in a '/forecast/plots' folder in the configurable save_dir in local_data (see climada.util.config) under a name summarizing the Hazard type, haz model name, initialization time of the forecast run, event date, exposure name and the plot title. As the class is relatively new, there might be future changes to the attributes, the methods, and the parameters used to call the methods. It was discovered at some point, that there might be a memory leak in matplotlib even when figures are closed (https://github.com/matplotlib/matplotlib/issues/8519). Due to this reason the plotting functions in this module have the flag close_fig, to close figures within the function scope, which might mitigate that problem if a script runs this plotting functions many times. Attributes ---------- run_datetime : list of datetime.datetime initialization time of the forecast model run used to create the Hazard event_date: datetime.datetime Date on which the Hazard event takes place hazard : list of CLIMADA Hazard List of the hazard forecast with different lead times. haz_model : str Short string specifying the model used to create the hazard, if possible three big letters. exposure : Exposure an CLIMADA Exposures containg values at risk exposure_name : str string specifying the exposure (e.g. 'EU'), which is used to name output files. vulnerability : ImpactFuncSet Set of impact functions used in the impact calculation. """
[docs] def __init__( self, hazard_dict: Dict[str, Hazard], exposure: Exposures, impact_funcs: ImpactFuncSet, haz_model: str = "NWP", exposure_name: Optional[str] = None ): """Initialization with hazard, exposure and vulnerability. Parameters ---------- hazard_dict : dict Dictionary of the format {run_datetime: Hazard} with run_datetime being the initialization time of a weather forecast run and Hazard being a CLIMADA Hazard derived from that forecast for one event. A probabilistic representation of that one event is possible, as long as the attribute Hazard.date is the same for all events. Several run_datetime:Hazard combinations for the same event can be provided. exposure : Exposures impact_funcs : ImpactFuncSet haz_model : str, optional Short string specifying the model used to create the hazard, if possible three big letters. Default is 'NWP' for numerical weather prediction. exposure_name : str, optional string specifying the exposure (e.g. 'EU'), which is used to name output files. If ``None``, the name will be inferred from the Exposures GeoDataframe ``region_id`` column, using the corresponding name of the region with the lowest ISO 3166-1 numeric code. If that fails, it defaults to ``"custom"``. """ self.run_datetime = list(hazard_dict.keys()) self.hazard = list(hazard_dict.values()) # check event_date hazard_date = np.unique( [date for hazard in self.hazard for date in hazard.date] ) if not len(hazard_date) == 1: raise ValueError( "Please provide hazards containing only one " + "event_date. The current hazards contain several " + "events with different event_dates and the Forecast " + "class cannot function proberly with such hazards." ) self.event_date = dt.datetime.fromordinal(hazard_date[0]) self.haz_model = haz_model self.exposure = exposure if exposure_name is None: try: self.exposure_name = u_coord.country_to_iso( exposure.gdf.region_id.unique()[0], "name" ) except (KeyError, AttributeError): self.exposure_name = "custom" else: self.exposure_name = exposure_name self.vulnerability = impact_funcs self._impact = [None for dt in self.run_datetime]
[docs] def ei_exp(self, run_datetime=None): """ Expected impact per exposure Parameters ---------- run_datetime : datetime.datetime, optional Select the used hazard by the run_datetime, default is first element of attribute run_datetime. Returns ------- float """ if run_datetime is None: run_datetime = self.run_datetime[0] haz_ind = np.argwhere(np.isin(self.run_datetime, run_datetime))[0][0] return self._impact[haz_ind].eai_exp
[docs] def ai_agg(self, run_datetime=None): """average impact aggregated over all exposures Parameters ---------- run_datetime : datetime.datetime, optional Select the used hazard by the run_datetime, default is first element of attribute run_datetime. Returns ------- float """ if run_datetime is None: run_datetime = self.run_datetime[0] haz_ind = np.argwhere(np.isin(self.run_datetime, run_datetime))[0][0] return self._impact[haz_ind].aai_agg
[docs] def haz_summary_str(self, run_datetime=None): """provide a summary string for the hazard part of the forecast Parameters ---------- run_datetime : datetime.datetime, optional Select the used hazard by the run_datetime, default is first element of attribute run_datetime. Returns ------- str summarizing the most important information about the hazard """ if run_datetime is None: run_datetime = self.run_datetime[0] haz_ind = np.argwhere(np.isin(self.run_datetime, run_datetime))[0][0] return ( self.hazard[haz_ind].haz_type + "_" + self.haz_model + "_run" + run_datetime.strftime("%Y%m%d%H") + "_event" + self.event_date.strftime("%Y%m%d") )
[docs] def summary_str(self, run_datetime=None): """provide a summary string for the impact forecast Parameters ---------- run_datetime: datetime.datetime, optional Select the used hazard by the run_datetime, default is first element of attribute run_datetime. Returns ------- str summarizing the most important information about the impact forecast """ if run_datetime is None: run_datetime = self.run_datetime[0] return self.haz_summary_str(run_datetime) + "_" + self.exposure_name
[docs] def lead_time(self, run_datetime=None): """provide the lead time for the impact forecast Parameters ---------- run_datetime : datetime.datetime, optional Select the used hazard by the run_datetime, default is first element of attribute run_datetime. Returns ------- datetime.timedelta the difference between the initialization time of the forecast model run and the date of the event, commenly named lead time """ if run_datetime is None: run_datetime = self.run_datetime[0] return self.event_date - run_datetime
[docs] def calc(self, force_reassign=False): """calculate the impacts for all lead times using exposure, all hazards of all run_datetime, and ImpactFunctionSet. Parameters ---------- force_reassign : bool, optional Reassign hazard centroids to the exposure for all hazards, default is false. """ # calc impact if self.hazard: self.exposure.assign_centroids(self.hazard[0], overwrite=force_reassign) for ind_i, haz_i in enumerate(self.hazard): self._impact[ind_i] = ImpactCalc(self.exposure, self.vulnerability, haz_i)\ .impact(save_mat=True, assign_centroids=False)
[docs] def plot_imp_map( self, run_datetime=None, explain_str=None, save_fig=True, close_fig=False, polygon_file=None, polygon_file_crs="epsg:4326", proj=ccrs.PlateCarree(), figsize=(9, 13), adapt_fontsize=True, ): """ plot a map of the impacts Parameters ---------- run_datetime : datetime.datetime, optional Select the used hazard by the run_datetime, default is first element of attribute run_datetime. explain_str : str, optional Short str which explains type of impact, explain_str is included in the title of the figure. default is 'mean building damage caused by wind' save_fig : bool, optional Figure is saved if True, folder is within your configurable save_dir and filename is derived from the method summary_str() (for more details see class docstring). Default is True. close_fig : bool, optional Figure not drawn if True. Default is False. polygon_file : str, optional Points to a .shp-file with polygons do be drawn as outlines on the plot, default is None to not draw the lines. please also specify the crs in the parameter polygon_file_crs. polygon_file_crs : str, optional String of pattern <provider>:<code> specifying the crs. has to be readable by pyproj.Proj. Default is 'epsg:4326'. proj : ccrs coordinate reference system used in coordinates The default is ccrs.PlateCarree() figsize : tuple figure size for plt.subplots, width, height in inches The default is (9, 13) adapt_fontsize : bool, optional If set to true, the size of the fonts will be adapted to the size of the figure. Otherwise the default matplotlib font size is used. Default is True. Returns ------- axes: cartopy.mpl.geoaxes.GeoAxesSubplot """ # select hazard with run_datetime if run_datetime is None: run_datetime = self.run_datetime[0] haz_ind = np.argwhere(np.isin(self.run_datetime, run_datetime))[0][0] # plot impact map of all runs map_file_name = self.summary_str(run_datetime) + "_impact_map" + ".jpeg" map_file_name_full = FORECAST_PLOT_DIR / map_file_name lead_time_str = "{:.0f}".format( self.lead_time(run_datetime).days + self.lead_time(run_datetime).seconds / 60 / 60 / 24 ) title_dict = { "event_day": self.event_date.strftime("%a %d %b %Y 00-24UTC"), "run_start": ( run_datetime.strftime("%d.%m.%Y %HUTC +") + lead_time_str + "d" ), "explain_text": "mean building damage caused by wind" if explain_str is None else explain_str, "model_text": "CLIMADA IMPACT", } fig, axes = self._plot_imp_map( run_datetime, title=title_dict, cbar_label=( "forecasted damage per gridcell [" + self._impact[haz_ind].unit + "]" ), polygon_file=polygon_file, polygon_file_crs=polygon_file_crs, proj=proj, figsize=figsize, adapt_fontsize=adapt_fontsize, ) if save_fig: fig.savefig(map_file_name_full) if close_fig: fig.clf() plt.close(fig) return axes
def _plot_imp_map( self, run_datetime, title, cbar_label, polygon_file=None, polygon_file_crs="epsg:4326", proj=ccrs.PlateCarree(), figsize=(9, 13), adapt_fontsize=True, ): # select hazard with run_datetime # pylint: disable=protected-access if run_datetime is None: run_datetime = self.run_datetime[0] haz_ind = np.argwhere(np.isin(self.run_datetime, run_datetime))[0][0] # tryout new plot with right projection extend = "neither" value = self._impact[haz_ind].eai_exp # value[np.invert(mask)] = np.nan coord = self._impact[haz_ind].coord_exp # Generate array of values used in each subplot array_sub = value shapes = True if not polygon_file: shapes = False var_name = cbar_label geo_coord = coord num_im, list_arr = u_plot._get_collection_arrays(array_sub) list_tit = to_list(num_im, title, "title") list_name = to_list(num_im, var_name, "var_name") list_coord = to_list(num_im, geo_coord, "geo_coord") kwargs = dict() kwargs["cmap"] = CMAP_IMPACT kwargs["s"] = 5 kwargs["marker"] = "," kwargs["norm"] = BoundaryNorm( np.append( np.append([0], [10**x for x in np.arange(0, 2.9, 2.9 / 9)]), [10**x for x in np.arange(3, 7, 4 / 90)], ), CMAP_IMPACT.N, clip=True, ) # Generate each subplot fig, axis_sub, _fontsize = u_plot.make_map( num_im, proj=proj, figsize=figsize, adapt_fontsize=adapt_fontsize ) if not isinstance(axis_sub, np.ndarray): axis_sub = np.array([[axis_sub]]) fig.set_size_inches(9, 8) for array_im, axis, tit, name, coord in zip( list_arr, axis_sub.flatten(), list_tit, list_name, list_coord ): if coord.shape[0] != array_im.size: raise ValueError( "Size mismatch in input array: %s != %s." % (coord.shape[0], array_im.size) ) # Binned image with coastlines extent = u_plot._get_borders(coord) axis.set_extent((extent), ccrs.PlateCarree()) hex_bin = axis.scatter( coord[:, 1], coord[:, 0], c=array_im, transform=ccrs.PlateCarree(), **kwargs ) if shapes: # add warning regions shp = shapereader.Reader(polygon_file) transformer = pyproj.Transformer.from_crs( polygon_file_crs, self._impact[haz_ind].crs, always_xy=True ) for geometry, _ in zip(shp.geometries(), shp.records()): geom2 = shapely.ops.transform(transformer.transform, geometry) axis.add_geometries( [geom2], crs=ccrs.PlateCarree(), facecolor="none", edgecolor="gray", ) else: # add country boundaries u_plot.add_shapes(axis) # Create colorbar in this axis cbax = make_axes_locatable(axis).append_axes( "bottom", size="6.5%", pad=0.3, axes_class=plt.Axes ) cbar = plt.colorbar( hex_bin, cax=cbax, orientation="horizontal", extend=extend ) cbar.set_label(name) cbar.formatter.set_scientific(False) cbar.set_ticks([0, 1000, 10000, 100000, 1000000]) cbar.set_ticklabels(["0", "1 000", "10 000", "100 000", "1 000 000"]) title_position = { "model_text": [0.02, 0.85], "explain_text": [0.02, 0.81], "event_day": [0.98, 0.85], "run_start": [0.98, 0.81], } left_right = { "model_text": "left", "explain_text": "left", "event_day": "right", "run_start": "right", } color = { "model_text": "k", "explain_text": "k", "event_day": "r", "run_start": "k", } for t_i in tit: plt.figtext( title_position[t_i][0], title_position[t_i][1], tit[t_i], fontsize="xx-large", color=color[t_i], ha=left_right[t_i], ) fig.tight_layout() fig.subplots_adjust(top=0.8) return fig, axis_sub
[docs] def plot_hist( self, run_datetime=None, explain_str=None, save_fig=True, close_fig=False, figsize=(9, 8), ): """ plot histogram of the forecasted impacts all ensemble members Parameters ---------- run_datetime : datetime.datetime, optional Select the used hazard by the run_datetime, default is first element of attribute run_datetime. explain_str : str, optional Short str which explains type of impact, explain_str is included in the title of the figure. default is 'total building damage' save_fig : bool, optional Figure is saved if True, folder is within your configurable save_dir and filename is derived from the method summary_str() (for more details see class docstring). Default is True. close_fig : bool, optional Figure is not drawn if True. Default is False. figsize : tuple figure size for plt.subplots, width, height in inches The default is (9, 8) Returns ------- axes : matplotlib.axes.Axes """ # select hazard with run_datetime if run_datetime is None: run_datetime = self.run_datetime[0] haz_ind = np.argwhere(np.isin(self.run_datetime, run_datetime))[0][0] # plot histogram of all runs histbin_file_name = self.summary_str(run_datetime) + "_histbin" + ".svg" histbin_file_name_full = FORECAST_PLOT_DIR / histbin_file_name lower_bound = np.max( [ np.floor( np.log10(np.max([np.min(self._impact[haz_ind].at_event), 0.1])) ), 0, ] ) upper_bound = ( np.max( [ np.ceil( np.log10(np.max([np.max(self._impact[haz_ind].at_event), 0.1])) ), lower_bound + 5, ] ) + 0.1 ) bins_log = np.arange(lower_bound, upper_bound, 0.5) bins = 10**bins_log fig = plt.figure(figsize=figsize) axes = fig.add_subplot(111) plt.xscale("log") plt.hist( [np.max([x, 1]) for x in self._impact[haz_ind].at_event], bins=bins, weights=np.ones(len(self._impact[haz_ind].at_event)) / len(self._impact[haz_ind].at_event), ) axes.yaxis.set_major_formatter(PercentFormatter(1)) formatter = ScalarFormatter() formatter.set_scientific(False) # formatter.format = "%:'.0f" axes.xaxis.set_major_formatter(formatter) x_ticklabels = list() x_ticks = list() ticklabel_dict = { pow(10, i): self._number_to_str(pow(10, i)) for i in range(0, 15) } # turn int to str for i in np.arange(15): tick_i = 10.0**i x_ticks.append(tick_i) x_ticklabels.append(ticklabel_dict[tick_i]) axes.xaxis.set_ticks(x_ticks) axes.xaxis.set_ticklabels(x_ticklabels) plt.xticks(rotation=15, horizontalalignment="right") plt.xlim([(10 ** -0.25) * bins[0], (10 ** 0.25) * bins[-1]]) lead_time_str = "{:.0f}".format( self.lead_time(run_datetime).days + self.lead_time(run_datetime).seconds / 60 / 60 / 24 ) title_dict = { "event_day": self.event_date.strftime("%a %d %b %Y 00-24UTC"), "run_start": ( run_datetime.strftime("%d.%m.%Y %HUTC +") + lead_time_str + "d" ), "explain_text": ("total building damage") if explain_str is None else explain_str, "model_text": "CLIMADA IMPACT", } title_position = { "model_text": [0.13, 0.94], "explain_text": [0.13, 0.9], "event_day": [0.9, 0.94], "run_start": [0.9, 0.9], } left_right = { "model_text": "left", "explain_text": "left", "event_day": "right", "run_start": "right", } color = { "model_text": "k", "explain_text": "k", "event_day": "r", "run_start": "k", } for t_i in title_dict: plt.figtext( title_position[t_i][0], title_position[t_i][1], title_dict[t_i], fontsize="xx-large", color=color[t_i], ha=left_right[t_i], ) plt.xlabel( "forecasted total damage for " + self.exposure_name + " [" + self._impact[haz_ind].unit + "]" ) plt.ylabel("probability") plt.text( 0.75, 0.85, "mean impact:\n " + self._number_to_str(self._impact[haz_ind].at_event.mean()) + ' ' + self._impact[haz_ind].unit, horizontalalignment="center", verticalalignment="center", transform=axes.transAxes, ) if save_fig: plt.savefig(histbin_file_name_full) if close_fig: plt.clf() plt.close(fig) return axes
@staticmethod def _number_to_str(number): """Generate numbers as str (with thousand, million, trillion). Example: 1000 becomes 1 thousand Parameters ---------- number : int Returns ------- str """ number_names = { 1: "", 1000: "thousand", 1000000: "million", 1000000000: "billion", 1000000000000: "trillion", } [value], name = u_value_to_monetary_unit(number, abbreviations=number_names) return "{:.0f} {}".format(value, name)
[docs] def plot_exceedence_prob( self, threshold, explain_str=None, run_datetime=None, save_fig=True, close_fig=False, polygon_file=None, polygon_file_crs="epsg:4326", proj=ccrs.PlateCarree(), figsize=(9, 13), adapt_fontsize=True, ): """plot exceedence map Parameters ---------- threshold : float Threshold of impact unit for which exceedence probability should be plotted. explain_str : str, optional Short str which explains threshold, explain_str is included in the title of the figure. run_datetime : datetime.datetime, optional Select the used hazard by the run_datetime, default is first element of attribute run_datetime. save_fig : bool, optional Figure is saved if True, folder is within your configurable save_dir and filename is derived from the method summary_str() (for more details see class docstring). Default is True. close_fig : bool, optional Figure not drawn if True. Default is False. polygon_file : str, optional Points to a .shp-file with polygons do be drawn as outlines on the plot, default is None to not draw the lines. please also specify the crs in the parameter polygon_file_crs. polygon_file_crs : str, optional String of pattern <provider>:<code> specifying the crs. has to be readable by pyproj.Proj. Default is 'epsg:4326'. proj : ccrs coordinate reference system used in coordinates The default is ccrs.PlateCarree() figsize : tuple figure size for plt.subplots, width, height in inches The default is (9, 13) adapt_fontsize : bool, optional If set to true, the size of the fonts will be adapted to the size of the figure. Otherwise, the default matplotlib font size is used. Default is True. Returns ------- axes: cartopy.mpl.geoaxes.GeoAxesSubplot """ # select hazard with run_datetime if run_datetime is None: run_datetime = self.run_datetime[0] haz_ind = np.argwhere(np.isin(self.run_datetime, run_datetime))[0][0] exceedence_map_file_name = ( self.summary_str(run_datetime) + "_exceed_" + str(threshold) + "_map.jpeg" ) exceedence_map_file_name_full = FORECAST_PLOT_DIR / exceedence_map_file_name lead_time_str = "{:.0f}".format( self.lead_time(run_datetime).days + self.lead_time(run_datetime).seconds / 60 / 60 / 24 ) title_dict = { "event_day": self.event_date.strftime("%a %d %b %Y 00-24UTC"), "run_start": ( run_datetime.strftime("%d.%m.%Y %HUTC +") + lead_time_str + "d" ), "explain_text": ( "threshold: " + str(threshold) + " " + self._impact[haz_ind].unit ) if explain_str is None else explain_str, "model_text": "Exceedance probability map", } cbar_label = "probabilty of reaching threshold" fig, axes = self._plot_exc_prob( run_datetime, threshold, title_dict, cbar_label, proj, polygon_file=polygon_file, polygon_file_crs=polygon_file_crs, figsize=figsize, adapt_fontsize=adapt_fontsize, ) if save_fig: plt.savefig(exceedence_map_file_name_full) if close_fig: plt.clf() plt.close(fig) return axes
def _plot_exc_prob( self, run_datetime, threshold, title, cbar_label, proj=ccrs.PlateCarree(), polygon_file=None, polygon_file_crs="epsg:4326", mask=None, figsize=(9, 13), adapt_fontsize=True, ): """plot the probability of reaching a threshold""" # select hazard with run_datetime # pylint: disable=protected-access if run_datetime is None: run_datetime = self.run_datetime[0] haz_ind = np.argwhere(np.isin(self.run_datetime, run_datetime))[0][0] extend = "neither" value = np.squeeze( np.asarray( (self._impact[haz_ind].imp_mat > threshold).sum(axis=0) / self._impact[haz_ind].event_id.size ) ) if mask is not None: value[np.invert(mask)] = np.nan coord = self._impact[haz_ind].coord_exp # Generate array of values used in each subplot array_sub = value shapes = True if not polygon_file: shapes = False var_name = cbar_label geo_coord = coord num_im, list_arr = u_plot._get_collection_arrays(array_sub) list_tit = to_list(num_im, title, "title") list_name = to_list(num_im, var_name, "var_name") list_coord = to_list(num_im, geo_coord, "geo_coord") kwargs = dict() kwargs["cmap"] = CMAP_WARNPROB kwargs["s"] = 5 kwargs["marker"] = "," kwargs["norm"] = BoundaryNorm(np.linspace(0, 1, 11), CMAP_WARNPROB.N, clip=True) # Generate each subplot fig, axis_sub, _fontsize = u_plot.make_map( num_im, proj=proj, figsize=figsize, adapt_fontsize=adapt_fontsize ) if not isinstance(axis_sub, np.ndarray): axis_sub = np.array([[axis_sub]]) fig.set_size_inches(9, 8) for array_im, axis, tit, name, coord in zip( list_arr, axis_sub.flatten(), list_tit, list_name, list_coord ): if coord.shape[0] != array_im.size: raise ValueError( "Size mismatch in input array: %s != %s." % (coord.shape[0], array_im.size) ) hex_bin = axis.scatter( coord[:, 1], coord[:, 0], c=array_im, transform=ccrs.PlateCarree(), **kwargs ) if shapes: # add warning regions shp = shapereader.Reader(polygon_file) transformer = pyproj.Transformer.from_crs( polygon_file_crs, self._impact[haz_ind].crs, always_xy=True ) for geometry, _ in zip(shp.geometries(), shp.records()): geom2 = shapely.ops.transform(transformer.transform, geometry) axis.add_geometries( [geom2], crs=ccrs.PlateCarree(), facecolor="none", edgecolor="gray", ) # Create colorbar in this axis cbax = make_axes_locatable(axis).append_axes( "bottom", size="6.5%", pad=0.3, axes_class=plt.Axes ) cbar = plt.colorbar( hex_bin, cax=cbax, orientation="horizontal", extend=extend ) cbar.set_label(name) title_position = { "model_text": [0.02, 0.94], "explain_text": [0.02, 0.9], "event_day": [0.98, 0.94], "run_start": [0.98, 0.9], } left_right = { "model_text": "left", "explain_text": "left", "event_day": "right", "run_start": "right", } color = { "model_text": "k", "explain_text": "k", "event_day": "r", "run_start": "k", } for t_i in tit: plt.figtext( title_position[t_i][0], title_position[t_i][1], tit[t_i], fontsize="xx-large", color=color[t_i], ha=left_right[t_i], ) extent = u_plot._get_borders(coord) axis.set_extent((extent), ccrs.PlateCarree()) fig.tight_layout() return fig, axis_sub
[docs] def plot_warn_map( self, polygon_file=None, polygon_file_crs="epsg:4326", thresholds="default", decision_level="exposure_point", probability_aggregation=0.5, area_aggregation=0.5, title="WARNINGS", explain_text="warn level based on thresholds", run_datetime=None, proj=ccrs.PlateCarree(), figsize=(9, 13), save_fig=True, close_fig=False, adapt_fontsize=True, ): """plot map colored with 5 warning colors for all regions in provided shape file. Parameters ---------- polygon_file : str, optional path to shp-file containing warning region polygons polygon_file_crs : str, optional String of pattern <provider>:<code> specifying the crs. has to be readable by pyproj.Proj. Default is 'epsg:4326'. thresholds : list of 4 floats, optional Thresholds for coloring region in second, third, forth and fifth warning color. decision_level : str, optional Either 'exposure_point' or 'polygon'. Default value is 'exposure_point'. probability_aggregation : float or str, optional Either a float between [0..1] spezifying a quantile or 'mean' or 'sum'. Default value is 0.5. area_aggregation : float or str. Either a float between [0..1] specifying a quantile or 'mean' or 'sum'. Default value is 0.5. run_datetime : datetime.datetime, optional Select the used hazard by the run_datetime, default is first element of attribute run_datetime. title : str, optional Default is 'WARNINGS'. explain_text : str, optional Defaut is 'warn level based on thresholds'. proj : ccrs coordinate reference system used in coordinates figsize : tuple figure size for plt.subplots, width, height in inches The default is (9, 13) save_fig : bool, optional Figure is saved if True, folder is within your configurable save_dir and filename is derived from the method summary_str() (for more details see class docstring). Default is True. close_fig : bool, optional Figure is not drawn if True. The default is False. adapt_fontsize : bool, optional If set to true, the size of the fonts will be adapted to the size of the figure. Otherwise, the default matplotlib font size is used. Default is True. Returns ------- axes : cartopy.mpl.geoaxes.GeoAxesSubplot """ # select hazard with run_datetime if thresholds == "default": thresholds = [2, 3, 4, 5] if run_datetime is None: run_datetime = self.run_datetime[0] warn_map_file_name = self.summary_str(run_datetime) + "_warn_map.jpeg" warn_map_file_name_full = FORECAST_PLOT_DIR / warn_map_file_name decision_dict = { "probability_aggregation": probability_aggregation, "area_aggregation": area_aggregation, } lead_time_str = "{:.0f}".format( self.lead_time(run_datetime).days + self.lead_time(run_datetime).seconds / 60 / 60 / 24 ) title_dict = { "event_day": self.event_date.strftime("%a %d %b %Y 00-24UTC"), "run_start": ( run_datetime.strftime("%d.%m.%Y %HUTC +") + lead_time_str + "d" ), "explain_text": explain_text, "model_text": title, } fig, axes = self._plot_warn( run_datetime, thresholds, decision_level, decision_dict, polygon_file, polygon_file_crs, title_dict, proj, figsize=figsize, adapt_fontsize=adapt_fontsize, ) if save_fig: plt.savefig(warn_map_file_name_full) if close_fig: plt.clf() plt.close(fig) return axes
def _plot_warn( self, run_datetime, thresholds, decision_level, decision_dict, polygon_file, polygon_file_crs, title, proj=ccrs.PlateCarree(), figsize=(9, 13), adapt_fontsize=True, ): """plotting the warning level of each warning region based on thresholds""" # select hazard with run_datetime # pylint: disable=protected-access if run_datetime is None: run_datetime = self.run_datetime[0] haz_ind = np.argwhere(np.isin(self.run_datetime, run_datetime))[0][0] kwargs = dict() kwargs["cmap"] = CMAP_WARNPROB kwargs["s"] = 5 kwargs["marker"] = "," kwargs["norm"] = BoundaryNorm(np.linspace(0, 1, 11), CMAP_WARNPROB.N, clip=True) # Generate each subplot fig, axis, _fontsize = u_plot.make_map( 1, proj=proj, figsize=figsize, adapt_fontsize=adapt_fontsize ) if isinstance(axis, np.ndarray): axis = axis[0] tit = title fig.set_size_inches(9, 8) # add warning regions shp = shapereader.Reader(polygon_file) transformer = pyproj.Transformer.from_crs( polygon_file_crs, self._impact[haz_ind].crs, always_xy=True ) # checking the decision dict and define the corresponding functions if not ( isinstance(decision_dict["probability_aggregation"], float) & isinstance(decision_dict["area_aggregation"], float) ): ValueError( " If decision_level is 'exposure_point'," + "parameters probability_aggregation and " + "area_aggregation of " + "Forecast.plot_warn_map() must both be " + "floats between [0..1]. Which each " + "specify quantiles." ) decision_dict_functions = decision_dict.copy() for aggregation in decision_dict: if isinstance(decision_dict[aggregation], float): decision_dict_functions[aggregation] = np.percentile elif decision_dict[aggregation] == "sum": decision_dict_functions[aggregation] = np.sum elif decision_dict[aggregation] == "mean": decision_dict_functions[aggregation] = np.mean else: raise ValueError( "Parameter " + aggregation + " of " + "Forecast.plot_warn_map() must eiter be " + "a float between [0..1], which " + "specifys a quantile. or 'sum' or 'mean'." ) for geometry, _ in zip(shp.geometries(), shp.records()): geom2 = shapely.ops.transform(transformer.transform, geometry) in_geom = u_coord.coord_on_land( lat=self._impact[haz_ind].coord_exp[:, 0], lon=self._impact[haz_ind].coord_exp[:, 1], land_geom=geom2, ) if not in_geom.any(): continue # decide warning level warn_level = 0 for ind_i, warn_thres_i in enumerate(thresholds): if decision_level == "exposure_point": # decision at each grid_point probabilities = np.squeeze( np.asarray( (self._impact[haz_ind].imp_mat >= warn_thres_i).sum(axis=0) / self._impact[haz_ind].event_id.size ) ) # quantiles over probability area = ( probabilities[in_geom] >= decision_dict["probability_aggregation"] ).sum() # quantiles over area if area >= (in_geom.sum() * decision_dict["area_aggregation"]): warn_level = ind_i + 1 elif decision_level == "polygon": # aggregation over area if isinstance(decision_dict["area_aggregation"], float): value_per_member = decision_dict_functions["area_aggregation"]( self._impact[haz_ind].imp_mat[:, in_geom].todense(), decision_dict["area_aggregation"], axis=1, ) else: value_per_member = decision_dict_functions["area_aggregation"]( self._impact[haz_ind].imp_mat[:, in_geom].todense(), axis=1 ) # aggregation over members/probability if isinstance(decision_dict["probability_aggregation"], float): value_per_region = decision_dict_functions[ "probability_aggregation" ](value_per_member, decision_dict["probability_aggregation"]) else: value_per_region = decision_dict_functions[ "probability_aggregation" ](value_per_member) # warn level decision if value_per_region >= warn_thres_i: warn_level = ind_i + 1 else: raise ValueError( "Parameter decision_level of " + "Forecast.plot_warn_map() must eiter be " + "'exposure_point' or 'polygon'." ) # plot warn_region with specific color (dependent on warning level) axis.add_geometries( [geom2], crs=ccrs.PlateCarree(), facecolor=COLORS_WARN[warn_level, :], edgecolor="gray", ) # Create legend in this axis hazard_levels = [ "1: Minimal or no hazard", "2: Moderate hazard", "3: Significant hazard", "4: Severe hazard", "5: Very severe hazard", ] legend_elements = [ Patch(facecolor=COLORS_WARN[n, :], edgecolor="gray", label=hazard_level) for n, hazard_level in enumerate(hazard_levels) ] axis.legend( handles=legend_elements, loc="upper center", framealpha=0.5, bbox_to_anchor=(0.5, -0.02), ncol=3, ) title_position = { "model_text": [0.02, 0.91], "explain_text": [0.02, 0.87], "event_day": [0.98, 0.91], "run_start": [0.98, 0.87], } left_right = { "model_text": "left", "explain_text": "left", "event_day": "right", "run_start": "right", } color = { "model_text": "k", "explain_text": "k", "event_day": "r", "run_start": "k", } for t_i in tit: plt.figtext( title_position[t_i][0], title_position[t_i][1], tit[t_i], fontsize="xx-large", color=color[t_i], ha=left_right[t_i], ) extent = u_plot._get_borders(self._impact[haz_ind].coord_exp) axis.set_extent((extent), ccrs.PlateCarree()) fig.tight_layout() return fig, axis
[docs] def plot_hexbin_ei_exposure(self, run_datetime=None, figsize=(9, 13)): """plot the expected impact Parameters ---------- run_datetime : datetime.datetime, optional Select the used hazard by the run_datetime, default is first element of attribute run_datetime. figsize : tuple figure size for plt.subplots, width, height in inches The default is (9, 13) Returns ------- cartopy.mpl.geoaxes.GeoAxesSubplot """ # select hazard with run_datetime if run_datetime is None: run_datetime = self.run_datetime[0] haz_ind = np.argwhere(np.isin(self.run_datetime, run_datetime))[0][0] return self._impact[haz_ind].plot_hexbin_eai_exposure(figsize=figsize)