"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Functions to merge EMDAT damages to hazard events.
"""
import logging
import pickle
from datetime import datetime
from pathlib import Path
import pandas as pd
import numpy as np
from iso3166 import countries as iso_cntry
from cartopy.io import shapereader
import shapefile
from climada.util.finance import gdp
from climada.util.constants import DEF_CRS
from climada.engine import Impact
from climada.entity.tag import Tag
from climada.hazard.tag import Tag as TagHaz
LOGGER = logging.getLogger(__name__)
PERIL_SUBTYPE_MATCH_DICT = dict(TC=['Tropical cyclone'],
FL=['Coastal flood'],
EQ=['Ground movement', 'Earthquake'],
RF=['Riverine flood', 'Flood'],
WS=['Extra-tropical storm', 'Storm'],
DR=['Drought'],
LS=['Landslide'],
BF=['Forest fire', 'Wildfire', 'Land fire (Brush, Bush, Pastur']
)
PERIL_TYPE_MATCH_DICT = dict(DR=['Drought'],
EQ=['Earthquake'],
FL=['Flood'],
LS=['Landslide'],
VQ=['Volcanic activity'],
BF=['Wildfire'],
HW=['Extreme temperature']
)
VARNAMES_EMDAT = \
{2018: {'Dis No': 'Disaster No.',
'Disaster Type': 'Disaster type',
'Disaster Subtype': 'Disaster subtype',
'Event Name': 'Disaster name',
'Country': 'Country',
'ISO': 'ISO',
'Location': 'Location',
'Associated Dis': 'Associated disaster',
'Associated Dis2': 'Associated disaster2',
'Dis Mag Value': 'Magnitude value',
'Dis Mag Scale': 'Magnitude scale',
'Latitude': 'Latitude',
'Longitude': 'Longitude',
'Total Deaths': 'Total deaths',
'Total Affected': 'Total affected',
"Insured Damages ('000 US$)": "Insured losses ('000 US$)",
"Total Damages ('000 US$)": "Total damage ('000 US$)"},
2020: {'Dis No': 'Dis No',
'Year': 'Year',
'Seq': 'Seq',
'Disaster Group': 'Disaster Group',
'Disaster Subgroup': 'Disaster Subgroup',
'Disaster Type': 'Disaster Type',
'Disaster Subtype': 'Disaster Subtype',
'Disaster Subsubtype': 'Disaster Subsubtype',
'Event Name': 'Event Name',
'Entry Criteria': 'Entry Criteria',
'Country': 'Country',
'ISO': 'ISO',
'Region': 'Region',
'Continent': 'Continent',
'Location': 'Location',
'Origin': 'Origin',
'Associated Dis': 'Associated Dis',
'Associated Dis2': 'Associated Dis2',
'OFDA Response': 'OFDA Response',
'Appeal': 'Appeal',
'Declaration': 'Declaration',
'Aid Contribution': 'Aid Contribution',
'Dis Mag Value': 'Dis Mag Value',
'Dis Mag Scale': 'Dis Mag Scale',
'Latitude': 'Latitude',
'Longitude': 'Longitude',
'Local Time': 'Local Time',
'River Basin': 'River Basin',
'Start Year': 'Start Year',
'Start Month': 'Start Month',
'Start Day': 'Start Day',
'End Year': 'End Year',
'End Month': 'End Month',
'End Day': 'End Day',
'Total Deaths': 'Total Deaths',
'No Injured': 'No Injured',
'No Affected': 'No Affected',
'No Homeless': 'No Homeless',
'Total Affected': 'Total Affected',
"Reconstruction Costs ('000 US$)": "Reconstruction Costs ('000 US$)",
"Insured Damages ('000 US$)": "Insured Damages ('000 US$)",
"Total Damages ('000 US$)": "Total Damages ('000 US$)",
'CPI': 'CPI'}}
[docs]def assign_hazard_to_emdat(certainty_level, intensity_path_haz, names_path_haz,
reg_id_path_haz, date_path_haz, emdat_data,
start_time, end_time, keep_checks=False):
"""assign_hazard_to_emdat: link EMdat event to hazard
Parameters:
input files (paths):
intensity: sparse matrix with hazards as rows and grid points as cols,
values only at location with impacts
names: identifier for each hazard (i.e. IBtracID) (rows of the matrix)
reg_id: ISO country ID of each grid point (cols of the matrix)
date: start date of each hazard (rows of the matrix)
emdat_data: pd.dataframe with EMdat data
start: start date of events to be assigned 'yyyy-mm-dd'
end: end date of events to be assigned 'yyyy-mm-dd'
disaster_subtype: EMdat disaster subtype
Returns:
pd.dataframe with EMdat entries linked to a hazard
"""
# check valid certainty level
certainty_levels = ['high', 'low']
if certainty_level not in certainty_levels:
raise ValueError("Invalid certainty level. Expected one of: %s" % certainty_levels)
# prepare hazard set
print("Start preparing hazard set")
hit_countries = hit_country_per_hazard(intensity_path_haz, names_path_haz,
reg_id_path_haz, date_path_haz)
# prepare damage set
# adjust emdat_data to the path!!
print("Start preparing damage set")
lookup = create_lookup(emdat_data, start_time, end_time, disaster_subtype='Tropical cyclone')
# calculate possible hits
print("Calculate possible hits")
hit5 = emdat_possible_hit(lookup=lookup, hit_countries=hit_countries, delta_t=5)
hit5_match = match_em_id(lookup=lookup, poss_hit=hit5)
print("1/5")
hit10 = emdat_possible_hit(lookup=lookup, hit_countries=hit_countries, delta_t=10)
hit10_match = match_em_id(lookup=lookup, poss_hit=hit10)
print("2/5")
hit15 = emdat_possible_hit(lookup=lookup, hit_countries=hit_countries, delta_t=15)
hit15_match = match_em_id(lookup=lookup, poss_hit=hit15)
print("3/5")
hit25 = emdat_possible_hit(lookup=lookup, hit_countries=hit_countries, delta_t=25)
hit25_match = match_em_id(lookup=lookup, poss_hit=hit25)
print("4/5")
hit50 = emdat_possible_hit(lookup=lookup, hit_countries=hit_countries, delta_t=50)
hit50_match = match_em_id(lookup=lookup, poss_hit=hit50)
print("5/5")
# assign only tracks with high certainty
print("Assign tracks")
if certainty_level == 'high':
lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit10_match,
possible_tracks_2=hit50_match, level=1)
lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit15_match,
possible_tracks_2=hit50_match, level=2)
lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit25_match,
possible_tracks_2=hit50_match, level=3)
lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit10_match,
possible_tracks_2=hit25_match, level=4)
lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit15_match,
possible_tracks_2=hit25_match, level=5)
# assign all tracks
elif certainty_level == 'low':
lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit5_match,
possible_tracks_2=hit50_match, level=1)
lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit10_match,
possible_tracks_2=hit50_match, level=2)
lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit15_match,
possible_tracks_2=hit50_match, level=3)
lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit5_match,
possible_tracks_2=hit25_match, level=4)
lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit10_match,
possible_tracks_2=hit25_match, level=5)
lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit15_match,
possible_tracks_2=hit25_match, level=6)
lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit5_match,
possible_tracks_2=hit15_match, level=7)
lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit10_match,
possible_tracks_2=hit15_match, level=8)
lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit5_match,
possible_tracks_2=hit10_match, level=9)
lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit15_match,
possible_tracks_2=hit15_match, level=10)
lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit10_match,
possible_tracks_2=hit10_match, level=11)
lookup = assign_track_to_em(lookup=lookup, possible_tracks_1=hit5_match,
possible_tracks_2=hit5_match, level=12)
if not keep_checks:
lookup = lookup.drop(['Date_start_EM_ordinal', 'possible_track',
'possible_track_all'], axis=1)
lookup.groupby('allocation_level').count()
print('(%d/%s) tracks allocated' % (
len(lookup[lookup.allocation_level.notnull()]), len(lookup)))
return lookup
[docs]def hit_country_per_hazard(intensity_path, names_path, reg_id_path, date_path):
"""hit_country_per_hazard: create list of hit countries from hazard set
Parameters:
input files:
intensity: sparse matrix with hazards as rows and grid points
as cols, values only at location with impacts
names: identifier for each hazard (i.e. IBtracID) (rows of the matrix)
reg_id: ISO country ID of each grid point (cols of the matrix)
date: start date of each hazard (rows of the matrix)
Returns:
pd.dataframe with all hit countries per hazard
"""
with open(intensity_path, 'rb') as filef:
inten = pickle.load(filef)
with open(names_path, 'rb') as filef:
names = pickle.load(filef)
with open(reg_id_path, 'rb') as filef:
reg_id = pickle.load(filef)
with open(date_path, 'rb') as filef:
date = pickle.load(filef)
# loop over the tracks (over the rows of the intensity matrix)
all_hits = []
for track in range(0, len(names)):
# select track
tc_track = inten[track, ]
# select only indices that are not zero
hits = tc_track.nonzero()[1]
# get the country of these indices and remove dublicates
hits = list(set(reg_id[hits]))
# append hit countries to list
all_hits.append(hits)
# create data frame for output
hit_countries = pd.DataFrame(columns=['hit_country', 'Date_start', 'ibtracsID'])
for track, _ in enumerate(names):
# Check if track has hit any country else go to the next track
if len(all_hits[track]) > 0:
# loop over hit_country
for hit in range(0, len(all_hits[track])):
# Hit country ISO
ctry_iso = iso_cntry.get(all_hits[track][hit]).alpha3
# create entry for each country a hazard has hit
hit_countries = hit_countries.append({'hit_country': ctry_iso,
'Date_start': date[track],
'ibtracsID': names[track]},
ignore_index=True)
# retrun data frame with all hit countries per hazard
return hit_countries
[docs]def create_lookup(emdat_data, start, end, disaster_subtype='Tropical cyclone'):
"""create_lookup: prepare a lookup table of EMdat events to which hazards can be assigned
Parameters:
emdat_data: pd.dataframe with EMdat data
start: start date of events to be assigned 'yyyy-mm-dd'
end: end date of events to be assigned 'yyyy-mm-dd'
disaster_subtype: EMdat disaster subtype
Returns:
pd.dataframe lookup
"""
data = emdat_data[emdat_data['Disaster_subtype'] == disaster_subtype]
lookup = pd.DataFrame(columns=['hit_country', 'Date_start_EM',
'Date_start_EM_ordinal', 'Disaster_name',
'EM_ID', 'ibtracsID', 'allocation_level',
'possible_track', 'possible_track_all'])
lookup.hit_country = data.ISO
lookup.Date_start_EM = data.Date_start_clean
lookup.Disaster_name = data.Disaster_name
lookup.EM_ID = data.Disaster_No
lookup = lookup.reset_index(drop=True)
# create ordinals
for i in range(0, len(data.Date_start_clean.values)):
lookup.Date_start_EM_ordinal[i] = datetime.toordinal(
datetime.strptime(lookup.Date_start_EM.values[i], '%Y-%m-%d'))
# ordinals to numeric
lookup.Date_start_EM_ordinal = pd.to_numeric(lookup.Date_start_EM_ordinal)
# select time
emdat_start = datetime.toordinal(datetime.strptime(start, '%Y-%m-%d'))
emdat_end = datetime.toordinal(datetime.strptime(end, '%Y-%m-%d'))
lookup = lookup[lookup.Date_start_EM_ordinal.values > emdat_start]
lookup = lookup[lookup.Date_start_EM_ordinal.values < emdat_end]
return lookup
# Function to relate EM disaster to IBtrack using hit countries and time
[docs]def emdat_possible_hit(lookup, hit_countries, delta_t):
"""relate EM disaster to hazard using hit countries and time
Parameters:
input files:
lookup: pd.dataframe to relate EMdatID to hazard
tracks: pd.dataframe with all hit countries per hazard
delta_t: max time difference of start of EMdat event and hazard
hit_countries:
start: start date of events to be assigned
end: end date of events to be assigned
disaster_subtype: EMdat disaster subtype
Returns:
list with possible hits
"""
# lookup: PD dataframe that relates EMdatID to an IBtracsID
# tracks: processed IBtracks with info which track hit which country
# delta_t: time difference of start of EMdat and IBrtacks
possible_hit_all = []
for i in range(0, len(lookup.EM_ID.values)):
possible_hit = []
country_tracks = hit_countries[
hit_countries['hit_country'] == lookup.hit_country.values[i]]
for j in range(0, len(country_tracks.Date_start.values)):
if (lookup.Date_start_EM_ordinal.values[i] - country_tracks.Date_start.values[j]) < \
delta_t and (lookup.Date_start_EM_ordinal.values[i] -
country_tracks.Date_start.values[j]) >= 0:
possible_hit.append(country_tracks.ibtracsID.values[j])
possible_hit_all.append(possible_hit)
return possible_hit_all
# function to check if EM_ID has been assigned already
[docs]def match_em_id(lookup, poss_hit):
"""function to check if EM_ID has been assigned already and combine possible hits
Parameters:
lookup: pd.dataframe to relate EMdatID to hazard
poss_hit: list with possible hits
Returns:
list with all possible hits per EMdat ID
"""
possible_hit_all = []
for i in range(0, len(lookup.EM_ID.values)):
possible_hit = []
# lookup without line i
#lookup_match = lookup.drop(i)
lookup_match = lookup
# Loop over check if EM dat ID is the same
for i_match in range(0, len(lookup_match.EM_ID.values)):
if lookup.EM_ID.values[i] == lookup_match.EM_ID.values[i_match]:
possible_hit.append(poss_hit[i])
possible_hit_all.append(possible_hit)
return possible_hit_all
[docs]def assign_track_to_em(lookup, possible_tracks_1, possible_tracks_2, level):
"""function to assign a hazard to an EMdat event
to get some confidene into the procedure, hazards get only assigned
if there is no other hazard occuring at a bigger time interval in that country
Thus a track of possible_tracks_1 gets only assigned if there are no other
tracks in possible_tracks_2.
The confidence can be expressed with a certainty level
Parameters:
lookup: pd.dataframe to relate EMdatID to hazard
possible_tracks_1: list of possible hits with smaller time horizon
possible_tracks_2: list of possible hits with larger time horizon
level: level of confidence
Returns:
pd.dataframe lookup with assigend tracks and possible hits
"""
for i, _ in enumerate(possible_tracks_1):
if np.isnan(lookup.allocation_level.values[i]):
number_emdat_id = len(possible_tracks_1[i])
# print(number_emdat_id)
for j in range(0, number_emdat_id):
# check that number of possible track stays the same at given
# time difference and that list is not empty
if len(possible_tracks_1[i][j]) == len(possible_tracks_2[i][j]) == 1 \
and possible_tracks_1[i][j] != []:
# check that all tracks are the same
if all(possible_tracks_1[i][0] == possible_tracks_1[i][k]
for k in range(0, len(possible_tracks_1[i]))):
# check that track ID has not been assigned to that country already
ctry_lookup = lookup[lookup['hit_country'] == lookup.hit_country.values[i]]
if possible_tracks_1[i][0][0] not in ctry_lookup.ibtracsID.values:
lookup.ibtracsID.values[i] = possible_tracks_1[i][0][0]
lookup.allocation_level.values[i] = level
elif possible_tracks_1[i][j] != []:
lookup.possible_track.values[i] = possible_tracks_1[i]
else:
lookup.possible_track_all.values[i] = possible_tracks_1[i]
return lookup
[docs]def check_assigned_track(lookup, checkset):
"""compare lookup with assigned tracks to a set with checked sets
Parameters:
lookup: pd.dataframe to relate EMdatID to hazard
checkset: pd.dataframe with already checked hazards
Returns:
error scores
"""
# merge checkset and lookup
check = pd.merge(checkset, lookup[['hit_country', 'EM_ID', 'ibtracsID']],
on=['hit_country', 'EM_ID'])
check_size = len(check.ibtracsID.values)
# not assigned values
not_assigned = check.ibtracsID.isnull().sum(axis=0)
# correct assigned values
correct = sum(check.ibtracsID.values == check.IBtracsID_checked.values)
# wrongly assigned values
wrong = len(check.ibtracsID.values) - not_assigned - correct
print('%.1f%% tracks assigned correctly, %.1f%% wrongly, %.1f%% not assigned'
% (correct / check_size * 100,
wrong / check_size * 100,
not_assigned / check_size * 100))
[docs]def clean_emdat_df(emdat_file, countries=None, hazard=None, year_range=None,
target_version=2020):
"""
Get a clean and standardized DataFrame from EM-DAT-CSV-file
(1) load EM-DAT data from CSV to DataFrame and remove header/footer,
(2) handle version, clean up, and add columns, and
(3) filter by country, hazard type and year range (if any given)
Parameters:
emdat_file (str, Path, or DataFrame): Either string with full path to CSV-file or
pandas.DataFrame loaded from EM-DAT CSV
Optional parameters:
countries (list of str): country ISO3-codes or names, e.g. ['JAM', 'CUB'].
countries=None for all countries (default)
hazard (list or str): List of Disaster (sub-)type accordung EMDAT terminology, i.e.:
Animal accident, Drought, Earthquake, Epidemic, Extreme temperature,
Flood, Fog, Impact, Insect infestation, Landslide, Mass movement (dry),
Storm, Volcanic activity, Wildfire;
Coastal Flooding, Convective Storm, Riverine Flood, Tropical cyclone,
Tsunami, etc.;
OR CLIMADA hazard type abbreviations, e.g. TC, BF, etc.
year_range (list or tuple): Year range to be extracted, e.g. (2000, 2015);
(only min and max are considered)
target_version (int): required EM-DAT data format version (i.e. year of download),
changes naming of columns/variables (default: 2020)
Returns:
df_data (pandas.DataFrame): DataFrame containing cleaned and filtered EM-DAT impact data
"""
# (1) load EM-DAT data from CSV to DataFrame, skipping the header:
if isinstance(emdat_file, (str, Path)):
df_emdat = pd.read_csv(emdat_file, encoding="ISO-8859-1", header=0)
counter = 0
while not ('Country' in df_emdat.columns and 'ISO' in df_emdat.columns):
counter += 1
df_emdat = pd.read_csv(emdat_file, encoding="ISO-8859-1", header=counter)
if counter == 10:
break
del counter
elif isinstance(emdat_file, pd.DataFrame):
df_emdat = emdat_file
else:
LOGGER.error('TypeError: emdat_file needs to be str or DataFrame')
return None
# drop rows with 9 or more NaN values (e.g. footer):
df_emdat = df_emdat.dropna(thresh=9)
# (2) handle version, clean up, and add columns:
# (2.1) identify underlying EMDAT version of csv:
version = 2020
for vers in list(VARNAMES_EMDAT.keys()):
if len(df_emdat.columns) >= len(VARNAMES_EMDAT[vers]) and \
all(item in list(df_emdat.columns) for item in VARNAMES_EMDAT[vers].values()):
version = vers
# (2.2) create new DataFrame df_data with column names as target version
df_data = pd.DataFrame(index=df_emdat.index.values,
columns=VARNAMES_EMDAT[target_version].values())
if 'Year' not in df_data.columns: # make sure column "Year" exists
df_data['Year'] = np.nan
for _, col in enumerate(df_data.columns): # loop over columns
if col in VARNAMES_EMDAT[version]:
df_data[col] = df_emdat[VARNAMES_EMDAT[version][col]]
elif col in df_emdat.columns:
df_data[col] = df_emdat[col]
elif col == 'Year' and version <= 2018:
years_list = list()
for _, disaster_no in enumerate(df_emdat[VARNAMES_EMDAT[version]['Dis No']]):
if isinstance(disaster_no, str):
years_list.append(int(disaster_no[0:4]))
else:
years_list.append(np.nan)
df_data[col] = years_list
if version <= 2018 and target_version >= 2020:
date_list = list()
year_list = list()
month_list = list()
day_list = list()
for year in list(df_data['Year']):
if not np.isnan(year):
date_list.append(datetime.strptime(str(year), '%Y'))
else:
date_list.append(datetime.strptime(str('0001'), '%Y'))
boolean_warning = True
for idx, datestr in enumerate(list(df_emdat['Start date'])):
try:
date_list[idx] = datetime.strptime(datestr[-7:], '%m/%Y')
except ValueError:
if boolean_warning:
LOGGER.warning('EM_DAT CSV contains invalid time formats')
boolean_warning = False
try:
date_list[idx] = datetime.strptime(datestr, '%d/%m/%Y')
except ValueError:
if boolean_warning:
LOGGER.warning('EM_DAT CSV contains invalid time formats')
boolean_warning = False
day_list.append(date_list[idx].day)
month_list.append(date_list[idx].month)
year_list.append(date_list[idx].year)
df_data['Start Month'] = np.array(month_list, dtype='int')
df_data['Start Day'] = np.array(day_list, dtype='int')
df_data['Start Year'] = np.array(year_list, dtype='int')
for var in ['Disaster Subtype', 'Disaster Type', 'Country']:
df_data[VARNAMES_EMDAT[target_version][var]].fillna('None', inplace=True)
# (3) Filter by countries, year range, and disaster type
# (3.1) Countries:
if countries and isinstance(countries, str):
countries = [countries]
if countries and isinstance(countries, list):
for idx, country in enumerate(countries):
# convert countries to iso3 alpha code:
countries[idx] = iso_cntry.get(country).alpha3
df_data = df_data[df_data['ISO'].isin(countries)].reset_index(drop=True)
# (3.2) Year range:
if year_range:
for idx in df_data.index:
if np.isnan(df_data.loc[0, 'Year']):
df_data.loc[0, 'Year'] = \
df_data.loc[0, VARNAMES_EMDAT[target_version]['Start Year']]
df_data = df_data[(df_data['Year'] >= min(year_range)) &
(df_data['Year'] <= max(year_range))]
# (3.3) Disaster type:
if hazard and isinstance(hazard, str):
hazard = [hazard]
if hazard and isinstance(hazard, list):
disaster_types = list()
disaster_subtypes = list()
for idx, haz in enumerate(hazard):
if haz in df_data[VARNAMES_EMDAT[target_version]['Disaster Type']].unique():
disaster_types.append(haz)
if haz in df_data[VARNAMES_EMDAT[target_version]['Disaster Subtype']].unique():
disaster_subtypes.append(haz)
if haz in PERIL_TYPE_MATCH_DICT.keys():
disaster_types += PERIL_TYPE_MATCH_DICT[haz]
if haz in PERIL_SUBTYPE_MATCH_DICT.keys():
disaster_subtypes += PERIL_SUBTYPE_MATCH_DICT[haz]
df_data = df_data[
(df_data[VARNAMES_EMDAT[target_version]['Disaster Type']].isin(disaster_types)) |
(df_data[VARNAMES_EMDAT[target_version]['Disaster Subtype']].isin(disaster_subtypes))]
return df_data.reset_index(drop=True)
[docs]def emdat_countries_by_hazard(emdat_file_csv, hazard=None, year_range=None):
"""return list of all countries exposed to a chosen hazard type
from EMDAT data as CSV.
Parameters:
emdat_file (str or DataFrame): Either string with full path to CSV-file or
pandas.DataFrame loaded from EM-DAT CSV
Optional Parameters:
hazard (list or str): List of Disaster (sub-)type accordung EMDAT terminology, i.e.:
Animal accident, Drought, Earthquake, Epidemic, Extreme temperature,
Flood, Fog, Impact, Insect infestation, Landslide, Mass movement (dry),
Storm, Volcanic activity, Wildfire;
Coastal Flooding, Convective Storm, Riverine Flood, Tropical cyclone,
Tsunami, etc.;
OR CLIMADA hazard type abbreviations, e.g. TC, BF, etc.:
year_range (tuple of integers or None):
range of years to consider, i.e. (1950, 2000)
default is None, i.e. consider all years
Returns:
countries_iso3a : list
List of ISO3-codes of countries impacted by the disaster (sub-)types
countries_names : list
List of names of countries impacted by the disaster (sub-)types
"""
df_data = clean_emdat_df(emdat_file_csv, hazard=hazard, year_range=year_range)
countries_iso3a = list(df_data.ISO.unique())
countries_names = list()
for iso3a in countries_iso3a:
try:
countries_names.append(iso_cntry.get(iso3a).name)
except KeyError:
countries_names.append('NA')
return countries_iso3a, countries_names
[docs]def scale_impact2refyear(impact_values, year_values, iso3a_values, reference_year=None):
"""Scale give impact values proportional to GDP to the according value in a reference year
(for normalization of monetary values)
Parameters:
impact_values (list or array):
Impact values to be scaled.
year_values (list or array):
Year of each impact (same length as impact_values)
iso3a_values (list or array):
ISO3alpha code of country for each impact (same length as impact_values)
Optional Parameters:
reference_year (int):
Impact is scaled proportional to GDP to the value of the reference year.
No scaling for reference_year=None (default)
"""
impact_values = np.array(impact_values)
year_values = np.array(year_values)
iso3a_values = np.array(iso3a_values)
if reference_year and isinstance(reference_year, (int, float)):
reference_year = int(reference_year)
gdp_ref = dict()
gdp_years = dict()
for country in np.unique(iso3a_values):
# get reference GDP value for each country:
gdp_ref[country] = gdp(country, reference_year)[1]
# get GDP value for each country and year:
gdp_years[country] = dict()
years_country = np.unique(year_values[iso3a_values == country])
print(years_country)
for year in years_country:
gdp_years[country][year] = gdp(country, year)[1]
# loop through each value and apply scaling:
for idx, val in enumerate(impact_values):
impact_values[idx] = val * gdp_ref[iso3a_values[idx]] / \
gdp_years[iso3a_values[idx]][year_values[idx]]
return list(impact_values)
if not reference_year:
return impact_values
LOGGER.error('Invalid reference_year')
return None
[docs]def emdat_impact_yearlysum(emdat_file_csv, countries=None, hazard=None, year_range=None,
reference_year=None, imp_str="Total Damages ('000 US$)",
version=2020):
"""function to load EM-DAT data and sum impact per year
Parameters:
emdat_file (str or DataFrame): Either string with full path to CSV-file or
pandas.DataFrame loaded from EM-DAT CSV
Optional parameters:
countries (list of str): country ISO3-codes or names, e.g. ['JAM', 'CUB'].
countries=None for all countries (default)
hazard (list or str): List of Disaster (sub-)type accordung EMDAT terminology, i.e.:
Animal accident, Drought, Earthquake, Epidemic, Extreme temperature,
Flood, Fog, Impact, Insect infestation, Landslide, Mass movement (dry),
Storm, Volcanic activity, Wildfire;
Coastal Flooding, Convective Storm, Riverine Flood, Tropical cyclone,
Tsunami, etc.;
OR CLIMADA hazard type abbreviations, e.g. TC, BF, etc.
year_range (list or tuple): Year range to be extracted, e.g. (2000, 2015);
(only min and max are considered)
version (int): given EM-DAT data format version (i.e. year of download),
changes naming of columns/variables (default: 2020)
reference_year (int): reference year of exposures. Impact is scaled
proportional to GDP to the value of the reference year. No scaling
for 0 (default)
imp_str (str): Column name of impact metric in EMDAT CSV,
default = "Total Damages ('000 US$)"
Returns:
out (pd.DataFrame): DataFrame with summed impact and scaled impact per
year and country.
"""
imp_str = VARNAMES_EMDAT[version][imp_str]
df_data = clean_emdat_df(emdat_file_csv, countries=countries, hazard=hazard,
year_range=year_range, target_version=version)
df_data[imp_str + " scaled"] = scale_impact2refyear(df_data[imp_str].values,
df_data.Year.values, df_data.ISO.values,
reference_year=reference_year)
out = pd.DataFrame(columns=['ISO', 'region_id', 'year', 'impact',
'impact_scaled', 'reference_year'])
for country in df_data.ISO.unique():
country = iso_cntry.get(country).alpha3
if not df_data.loc[df_data.ISO == country].size:
continue
all_years = np.arange(min(df_data.Year), max(df_data.Year) + 1)
data_out = pd.DataFrame(index=np.arange(0, len(all_years)),
columns=out.columns)
df_country = df_data.loc[df_data.ISO == country]
for cnt, year in enumerate(all_years):
data_out.loc[cnt, 'year'] = year
data_out.loc[cnt, 'reference_year'] = reference_year
data_out.loc[cnt, 'ISO'] = country
data_out.loc[cnt, 'region_id'] = int(iso_cntry.get(country).numeric)
data_out.loc[cnt, 'impact'] = \
np.nansum(df_country[df_country.Year.isin([year])][imp_str])
data_out.loc[cnt, 'impact_scaled'] = \
np.nansum(df_country[df_country.Year.isin([year])][imp_str + " scaled"])
if '000 US' in imp_str: # EM-DAT damages provided in '000 USD
data_out.loc[cnt, 'impact'] = data_out.loc[cnt, 'impact'] * 1e3
data_out.loc[cnt, 'impact_scaled'] = data_out.loc[cnt, 'impact_scaled'] * 1e3
out = out.append(data_out)
out = out.reset_index(drop=True)
return out
[docs]def emdat_impact_event(emdat_file_csv, countries=None, hazard=None, year_range=None,
reference_year=None, imp_str="Total Damages ('000 US$)",
version=2020):
"""function to load EM-DAT data return impact per event
Parameters:
emdat_file_csv (str): Full path to EMDAT-file (CSV), i.e.:
emdat_file_csv = SYSTEM_DIR.joinpath('emdat_201810.csv')
Optional parameters:
countries (list of str): country ISO3-codes or names, e.g. ['JAM', 'CUB'].
countries=None for all countries (default)
hazard (list or str): List of Disaster (sub-)type accordung EMDAT terminology, i.e.:
Animal accident, Drought, Earthquake, Epidemic, Extreme temperature,
Flood, Fog, Impact, Insect infestation, Landslide, Mass movement (dry),
Storm, Volcanic activity, Wildfire;
Coastal Flooding, Convective Storm, Riverine Flood, Tropical cyclone,
Tsunami, etc.;
OR CLIMADA hazard type abbreviations, e.g. TC, BF, etc.
year_range (list or tuple): Year range to be extracted, e.g. (2000, 2015);
(only min and max are considered)
reference_year (int): reference year of exposures. Impact is scaled
proportional to GDP to the value of the reference year. No scaling
for 0 (default)
imp_str (str): Column name of impact metric in EMDAT CSV,
default = "Total Damages ('000 US$)"
version (int): EM-DAT version to take variable/column names from (defaul: 2020)
Returns:
out (pandas DataFrame): EMDAT DataFrame with new columns "year",
"region_id", and "impact" and +impact_scaled" total impact per event with
same unit as chosen impact, but multiplied by 1000 if impact is given
as 1000 US$ (e.g. imp_str="Total Damages ('000 US$) scaled").
"""
imp_str = VARNAMES_EMDAT[version][imp_str]
df_data = clean_emdat_df(emdat_file_csv, hazard=hazard, year_range=year_range,
countries=countries, target_version=version)
df_data['year'] = df_data['Year']
df_data['reference_year'] = reference_year
df_data['impact'] = df_data[imp_str]
df_data['impact_scaled'] = scale_impact2refyear(df_data[imp_str].values, df_data.Year.values,
df_data.ISO.values,
reference_year=reference_year)
df_data['region_id'] = np.nan
for country in df_data.ISO.unique():
try:
df_data.loc[df_data.ISO == country, 'region_id'] = \
int(iso_cntry.get(country).numeric)
except KeyError:
LOGGER.warning('ISO3alpha code not found in iso_country: %s', country)
if '000 US' in imp_str:
df_data['impact'] *= 1e3
df_data['impact_scaled'] *= 1e3
return df_data.reset_index(drop=True)
[docs]def emdat_to_impact(emdat_file_csv, hazard_type_climada, year_range=None, countries=None,
hazard_type_emdat=None,
reference_year=None, imp_str="Total Damages"):
"""function to load EM-DAT data return impact per event
Parameters:
emdat_file_csv (str): Full path to EMDAT-file (CSV), i.e.:
emdat_file_csv = SYSTEM_DIR.joinpath('emdat_201810.csv')
hazard_type_climada (str): Hazard type CLIMADA abbreviation,
i.e. 'TC' for tropical cyclone
Optional parameters:
hazard_type_emdat (list or str): List of Disaster (sub-)type accordung
EMDAT terminology, e.g.:
Animal accident, Drought, Earthquake, Epidemic, Extreme temperature,
Flood, Fog, Impact, Insect infestation, Landslide, Mass movement (dry),
Storm, Volcanic activity, Wildfire;
Coastal Flooding, Convective Storm, Riverine Flood, Tropical cyclone,
Tsunami, etc.;
OR CLIMADA hazard type abbreviations, e.g. TC, BF, etc.
If not given, it is deducted from hazard_type_climada
year_range (list with 2 integers): start and end year e.g. [1980, 2017]
default: None --> take year range from EM-DAT file
countries (list of str): country ISO3-codes or names, e.g. ['JAM'].
Set to None or ['all'] for all countries (default)
reference_year (int): reference year of exposures. Impact is scaled
proportional to GDP to the value of the reference year. No scaling
for reference_year=0 (default)
imp_str (str): Column name of impact metric in EMDAT CSV,
default = "Total Damages ('000 US$)"
Returns:
impact_instance (instance of climada.engine.Impact):
impact object of same format as output from CLIMADA
impact computation.
Values scaled with GDP to reference_year if reference_year is given.
i.e. current US$ for imp_str="Total Damages ('000 US$) scaled" (factor 1000 is applied)
impact_instance.eai_exp holds expected annual impact for each country.
impact_instance.coord_exp holds rough central coordinates for each country.
countries (list): ISO3-codes of countries in same order as in impact_instance.eai_exp
"""
if "Total Damages" in imp_str:
imp_str = "Total Damages ('000 US$)"
elif "Insured Damages" in imp_str:
imp_str = "Insured Damages ('000 US$)"
elif "Reconstruction Costs" in imp_str:
imp_str = "Reconstruction Costs ('000 US$)"
imp_str = VARNAMES_EMDAT[max(VARNAMES_EMDAT.keys())][imp_str]
if not hazard_type_emdat:
hazard_type_emdat = [hazard_type_climada]
if reference_year == 0:
reference_year = None
# Inititate Impact-instance:
impact_instance = Impact()
impact_instance.tag = dict()
impact_instance.tag['haz'] = TagHaz(haz_type=hazard_type_climada,
file_name=emdat_file_csv,
description='EM-DAT impact, direct import')
impact_instance.tag['exp'] = Tag(file_name=emdat_file_csv,
description='EM-DAT impact, direct import')
impact_instance.tag['if_set'] = Tag(file_name=None, description=None)
# Load EM-DAT impact data by event:
em_data = emdat_impact_event(emdat_file_csv, countries=countries, hazard=hazard_type_emdat,
year_range=year_range, reference_year=reference_year,
imp_str=imp_str, version=max(VARNAMES_EMDAT.keys()))
if isinstance(countries, str):
countries = [countries]
elif not countries:
countries = emdat_countries_by_hazard(emdat_file_csv, year_range=year_range,
hazard=hazard_type_emdat)[0]
if em_data.empty:
return impact_instance, countries
impact_instance.event_id = np.array(em_data.index, int)
impact_instance.event_name = list(
em_data[VARNAMES_EMDAT[max(VARNAMES_EMDAT.keys())]['Dis No']])
date_list = list()
for year in list(em_data['Year']):
date_list.append(datetime.toordinal(datetime.strptime(str(year), '%Y')))
if 'Start Year' in em_data.columns and 'Start Month' in em_data.columns \
and 'Start Day' in em_data.columns:
idx = 0
for year, month, day in zip(em_data['Start Year'], em_data['Start Month'],
em_data['Start Day']):
if np.isnan(year):
idx += 1
continue
if np.isnan(month):
month = 1
if np.isnan(day):
day = 1
date_list[idx] = datetime.toordinal(datetime.strptime(
'%02i/%02i/%04i' % (day, month, year), '%d/%m/%Y'))
idx += 1
impact_instance.date = np.array(date_list, int)
impact_instance.crs = DEF_CRS
if not reference_year:
impact_instance.at_event = np.array(em_data["impact"])
else:
impact_instance.at_event = np.array(em_data["impact_scaled"])
impact_instance.at_event[np.isnan(impact_instance.at_event)] = 0
if not year_range:
year_range = [em_data['Year'].min(), em_data['Year'].max()]
impact_instance.frequency = np.ones(em_data.shape[0]) / (1 + np.diff(year_range))
impact_instance.tot_value = 0
impact_instance.aai_agg = np.nansum(impact_instance.at_event * impact_instance.frequency)
impact_instance.unit = 'USD'
impact_instance.imp_mat = []
# init rough exposure with central point per country
shp = shapereader.natural_earth(resolution='110m',
category='cultural',
name='admin_0_countries')
shp = shapefile.Reader(shp)
countries_reg_id = list()
countries_lat = list()
countries_lon = list()
impact_instance.eai_exp = np.zeros(len(countries)) # empty: damage at exposure
for idx, cntry in enumerate(countries):
try:
cntry = iso_cntry.get(cntry).alpha3
except KeyError:
print(cntry)
LOGGER.error('Country not found in iso_country: %s', cntry)
cntry_boolean = False
for rec_i, rec in enumerate(shp.records()):
if rec[9].casefold() == cntry.casefold():
bbox = shp.shapes()[rec_i].bbox
cntry_boolean = True
break
if cntry_boolean:
countries_lat.append(np.mean([bbox[1], bbox[3]]))
countries_lon.append(np.mean([bbox[0], bbox[2]]))
else:
countries_lat.append(np.nan)
countries_lon.append(np.nan)
try:
countries_reg_id.append(int(iso_cntry.get(cntry).numeric))
except KeyError:
countries_reg_id.append(0)
df_tmp = em_data[em_data[VARNAMES_EMDAT[
max(VARNAMES_EMDAT.keys())]['ISO']].str.contains(cntry)]
if not reference_year:
impact_instance.eai_exp[idx] = sum(np.array(df_tmp["impact"]) *
impact_instance.frequency[0])
else:
impact_instance.eai_exp[idx] = sum(np.array(df_tmp["impact_scaled"]) *
impact_instance.frequency[0])
impact_instance.coord_exp = np.stack([countries_lat, countries_lon], axis=1)
return impact_instance, countries