"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Define interpolation and extrapolation functions for calculating
(local) exceedance frequencies and return periods
"""
import logging
import numpy as np
from scipy import interpolate
LOGGER = logging.getLogger(__name__)
[docs]
def preprocess_and_interpolate_ev(
test_frequency,
test_values,
frequency,
values,
log_frequency=False,
log_values=False,
value_threshold=None,
method="interpolate",
y_asymptotic=np.nan,
bin_decimals=None,
):
"""Function to first preprocess (frequency, values) data (if extrapolating, one can bin the
data according to their value, see Notes), compute the cumulative frequencies, and then
inter- and extrapolate either to test frequencies or to test values.
Parameters
----------
test_frequency : array_like
1-D array of test frequencies for which values (e.g., intensities or impacts) should be
assigned. If given, test_values must be None.
test_values : array_like
1-D array of test values (e.g., intensities or impacts) for which frequencies should be
assigned. If given, test_frequency must be None.
frequency : array_like
1-D array of frequencies to be interpolated.
values : array_like
1-D array of values (e.g., intensities or impacts) to be interpolated.
log_frequency : bool, optional
If set to True, frequencies are interpolated on log scale. Defaults to False.
log_values : bool, optional
If set to True, values (e.g., intensities) are interpolated on log scale.
Defaults to False.
value_threshold : float, optional
Lower threshold to filter values (e.g., intensities or impacts). Defaults to None.
method : str, optional
Method to interpolate to test x values. Currently available are
"interpolate", "extrapolate", "extrapolate_constant" and "stepfunction". If set to
"interpolate", test x values outside the range of the given x values will be assigned NaN.
If set to "extrapolate_constant" or "stepfunction", test x values larger than given
x values will be assigned largest given y value, and test x values smaller than the given
x values will be assigned y_asymtotic. If set to "extrapolate", values will be extrapolated
(and interpolated). The extrapolation to test frequencies or test values outside of the
data range extends the two interpolations at the edges of the data to outside of
the data. Defaults to "interpolate".
y_asymptotic : float, optional
Has no effect if method is "interpolate". Else, provides return value and if
for test x values larger than given x values, if size < 2 or if method is set
to "extrapolate_constant" or "stepfunction". Defaults to np.nan.
bin_decimals : int, optional
Number of decimals to group and bin the values. Binning results in smoother (and coarser)
interpolation and more stable extrapolation. For more details and sensible values for
bin_decimals, see Notes. If None, values are not binned. Defaults to None.
Returns
-------
np.array
interpolated (and extrapolated) values or frequencies for given test frequencies or test
values, respectively.
Raises
------
ValueError
If both test frequencies and test values are given or none of them.
Notes
-----
If an integer bin_decimals is given, the values are binned according to their
bin_decimals decimals, and their corresponding frequencies are summed. This binning leads to
a smoother (and coarser) interpolation, and a more stable extrapolation. For instance, if
bin_decimals=1, the two values 12.01 and 11.97 with corresponding frequencies 0.1 and 0.2 are
combined to a value 12.0 with frequency 0.3. The default bin_decimals=None results in not
binning the values.
E.g., if your values range from 1 to 100, you could use bin_decimals=1, if your values range
from 1e6 to 1e9, you could use bin_decimals=-5, if your values range from 0.0001 to .01, you
could use bin_decimals=5.
"""
# check that only test frequencies or only test values are given
if test_frequency is not None and test_values is not None:
raise ValueError(
"Both test frequencies and test values are given. This method only handles one of "
"the two. To use this method, please only use one of them."
)
if test_frequency is None and test_values is None:
raise ValueError("No test values or test frequencies are given.")
# sort values and frequencies
sorted_idxs = np.argsort(values)
values = np.squeeze(values[sorted_idxs])
frequency = frequency[sorted_idxs]
# group similar values together
if isinstance(bin_decimals, int):
frequency, values = _group_frequency(frequency, values, bin_decimals)
# transform frequencies to cummulative frequencies
frequency = np.cumsum(frequency[::-1])[::-1]
# if test frequencies are provided
if test_frequency is not None:
if method == "stepfunction":
return _stepfunction_ev(
test_frequency,
frequency[::-1],
values[::-1],
y_threshold=value_threshold,
y_asymptotic=y_asymptotic,
)
extrapolation = None if method == "interpolate" else method
return _interpolate_ev(
test_frequency,
frequency[::-1],
values[::-1],
logx=log_frequency,
logy=log_values,
y_threshold=value_threshold,
extrapolation=extrapolation,
y_asymptotic=y_asymptotic,
)
# if test values are provided
if method == "stepfunction":
return _stepfunction_ev(
test_values,
values,
frequency,
x_threshold=value_threshold,
y_asymptotic=y_asymptotic,
)
extrapolation = None if method == "interpolate" else method
return _interpolate_ev(
test_values,
values,
frequency,
logx=log_values,
logy=log_frequency,
x_threshold=value_threshold,
extrapolation=extrapolation,
)
def _interpolate_ev(
x_test,
x_train,
y_train,
logx=False,
logy=False,
x_threshold=None,
y_threshold=None,
extrapolation=None,
y_asymptotic=np.nan,
):
"""
Util function to interpolate (and extrapolate) training data (x_train, y_train)
to new points x_test with several options (log scale, thresholds)
Parameters
----------
x_test : array_like
1-D array of x-values for which training data should be interpolated
x_train : array_like
1-D array of x-values of training data sorted in ascending order
y_train : array_like
1-D array of y-values of training data
logx : bool, optional
If set to True, x_values are converted to log scale. Defaults to False.
logy : bool, optional
If set to True, y_values are converted to log scale. Defaults to False.
x_threshold : float, optional
Lower threshold to filter x_train. Defaults to None.
y_threshold : float, optional
Lower threshold to filter y_train. Defaults to None.
extrapolation : str, optional
If set to 'extrapolate', values will be extrapolated. If set to 'extrapolate_constant',
x_test values smaller than x_train will be assigned y_train[0] (x_train must be sorted
in ascending order), and x_test values larger than x_train will be assigned
y_asymptotic. If set to None, x_test values outside of the range of x_train will be
assigned np.nan. Defaults to None.
y_asymptotic : float, optional
Has no effect if extrapolation is None. Else, provides return value and if
for x_test values larger than x_train, for x_train.size < 2 or if extrapolation is set
to 'extrapolate_constant'. Defaults to np.nan.
Returns
-------
np.array
interpolated values y_test for the test points x_test
"""
# preprocess interpolation data
x_test, x_train, y_train = _preprocess_interpolation_data(
x_test, x_train, y_train, logx, logy, x_threshold, y_threshold
)
# handle case of small training data sizes
if x_train.size < 2:
if not extrapolation:
return np.full_like(x_test, np.nan)
return _interpolate_small_input(x_test, x_train, y_train, logy, y_asymptotic)
# calculate fill values
if extrapolation == "extrapolate":
fill_value = "extrapolate"
elif extrapolation == "extrapolate_constant":
fill_value = (y_train[0], np.log10(y_asymptotic) if logy else y_asymptotic)
else:
fill_value = np.nan
interpolation = interpolate.interp1d(
x_train, y_train, fill_value=fill_value, bounds_error=False
)
y_test = interpolation(x_test)
# adapt output scale
if logy:
y_test = np.power(10.0, y_test)
return y_test
def _stepfunction_ev(
x_test, x_train, y_train, x_threshold=None, y_threshold=None, y_asymptotic=np.nan
):
"""
Util function to interpolate and extrapolate training data (x_train, y_train)
to new points x_test using a step function
Parameters
----------
x_test : array_like
1-D array of x-values for which training data should be interpolated
x_train : array_like
1-D array of x-values of training data sorted in ascending order
y_train : array_like
1-D array of y-values of training data
x_threshold : float, optional
Lower threshold to filter x_train. Defaults to None.
y_threshold : float, optional
Lower threshold to filter y_train. Defaults to None.
y_asymptotic : float, optional
Return value if x_test > x_train. Defaults to np.nan.
Returns
-------
np.array
interpolated values y_test for the test points x_test
"""
# preprocess interpolation data
x_test, x_train, y_train = _preprocess_interpolation_data(
x_test, x_train, y_train, None, None, x_threshold, y_threshold
)
# handle case of small training data sizes
if x_train.size < 2:
return _interpolate_small_input(x_test, x_train, y_train, None, y_asymptotic)
# find indices of x_test if sorted into x_train
indx = np.searchsorted(x_train, x_test)
y_test = y_train[indx.clip(max=len(x_train) - 1)]
y_test[indx == len(x_train)] = y_asymptotic
return y_test
def _preprocess_interpolation_data(
x_test, x_train, y_train, logx, logy, x_threshold, y_threshold
):
"""
helper function to preprocess interpolation training and test data by filtering data below
thresholds and converting to log scale if required
"""
if x_train.shape != y_train.shape:
raise ValueError(
f"Incompatible shapes of input data, x_train {x_train.shape} "
f"and y_train {y_train.shape}. Should be the same"
)
# transform input to float arrays
x_test, x_train, y_train = (
np.array(x_test).astype(float),
np.array(x_train).astype(float),
np.array(y_train).astype(float),
)
# cut x and y above threshold
if x_threshold or x_threshold == 0:
x_th = np.asarray(x_train > x_threshold).squeeze()
x_train = x_train[x_th]
y_train = y_train[x_th]
if y_threshold or y_threshold == 0:
y_th = np.asarray(y_train > y_threshold).squeeze()
x_train = x_train[y_th]
y_train = y_train[y_th]
# convert to log scale
if logx:
x_train, x_test = np.log10(x_train), np.log10(x_test)
if logy:
y_train = np.log10(y_train)
return (x_test, x_train, y_train)
def _interpolate_small_input(x_test, x_train, y_train, logy, y_asymptotic):
"""
helper function to handle if interpolation data is small (empty or one point)
"""
# return y_asymptotic if x_train and y_train empty
if x_train.size == 0:
return np.full_like(x_test, y_asymptotic)
# reconvert logarithmic y_train to original y_train
if logy:
y_train = np.power(10.0, y_train)
# if only one (x_train, y_train), return stepfunction with
# y_train if x_test < x_train and y_asymtotic if x_test > x_train
y_test = np.full_like(x_test, y_train[0])
y_test[np.squeeze(x_test) > np.squeeze(x_train)] = y_asymptotic
return y_test
def _group_frequency(frequency, value, bin_decimals):
"""
Util function to aggregate (add) frequencies for equal values
Parameters
----------
frequency : array_like
Frequency array
value : array_like
Value array in ascending order
bin_decimals : int
decimals according to which values are binned and their corresponding frequency are
grouped.
Returns
-------
tuple
(frequency array after aggregation,
unique value array in ascending order)
"""
frequency, value = np.array(frequency), np.array(value)
if frequency.size == 0 and value.size == 0:
return ([], [])
# round values and group them
value = np.around(value, decimals=bin_decimals)
value_unique, start_indices = np.unique(value, return_index=True)
if value_unique.size != frequency.size:
if not all(sorted(start_indices) == start_indices):
LOGGER.warning(
"After grouping values using to their decimals, the value array is not sorted."
"The values are not binned. This might be due to floating point error while "
"binning. Please choose a larger value of bin_decimals=%s.",
bin_decimals,
)
return frequency, value
# add frequency for equal value
start_indices = np.insert(start_indices, value_unique.size, frequency.size)
frequency = np.add.reduceat(frequency, start_indices[:-1])
return frequency, value_unique
return frequency, value