"""Retrieval of IWP from passive radiometers
This class is a reimplementation of the SPARE-ICE product introduced by
Holl et al. 2014.
References:
TODO: Add reference.
Examples:
.. code-block:: python
from typhon.files import AVHRR_GAC_HDF, CloudSat, FileSet, MHS_HDF
from typhon.retrieval import SPAREICE
cloudsat = FileSet(...)
mhs = FileSet(...)
avhrr = FileSet(...)
spareice = SPAREICE(
file="spareice.json",
)
# Either we have already collocated, then we can use the files directly for
# the training or SPARE-ICE should create the training dataset by itself
# (should collocate by itself).
data = spareice.prepare_training_data(
# Do we have already collocations with all instruments? Put them here:
collocations=...,
# OR
cloudsat=cloudsat, mhs=mhs, avhrr=avhrr,
# Which time period should be used for training?
start=..., end=...,
)
# To save time and memory space, we can store the current object with
# the training data to the disk and reuse it later directly. So, we do not
# have to call spareice.prepare_training_data again:
data.to_netcdf("spareice_training.nc")
# Train SPARE-ICE with the data
spareice.train(data, test_ratio=0.2)
# After training, we can use the SPARE-ICE retrieval:
spareice.retrieve(
# Do we have already collocations with MHS and AVHRR? Put them here:
collocations=...,
# Otherwise we can put here each fileset and create collocations
# on-the-fly
mhs=mhs, avhrr=avhrr,
# Which time period should be used for retrieving?
start=..., end=...,
output=...,
)
"""
from ast import literal_eval
import itertools
import logging
import os
from os.path import join, dirname
import warnings
import imageio
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.stats import binned_statistic as sci_binned_statistic
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import GridSearchCV
from sklearn.neural_network import MLPRegressor
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import RobustScaler
from sklearn.tree import DecisionTreeClassifier
from typhon.collocations import collapse, Collocations, Collocator
from typhon.geographical import sea_mask
from typhon.plots import binned_statistic, heatmap
from typhon.utils import to_array, Timer
import xarray as xr
from ..common import RetrievalProduct
__all__ = [
'SPAREICE',
]
# The path to the standard weights:
PARAMETERS_DIR = join(dirname(__file__), 'parameters')
STANDARD_FILE = join(PARAMETERS_DIR, "standard.json")
logger = logging.getLogger(__name__)
[docs]class SPAREICE:
"""Retrieval of IWP from passive radiometers
Examples:
.. code-block:: python
import pandas as pd
from typhon.retrieval import SPAREICE
# Create a SPARE-ICE object with the standard weights
spareice = SPAREICE()
# Print the required input fields
print(spareice.inputs)
# If you want to know the input fields for the each component, IWP
# regressor and ice cloud classifier, you can get them like this:
print(spareice.iwp.inputs) # Inputs from IWP regressor
print(spareice.ice_cloud.inputs) # Inputs from ice cloud classifier
# If you have yur own input data, you can use :meth:`retrieve` to run
# SPARE-ICE on it.
data = pd.DataFrame(...)
retrieved = spareice.retrieve(data)
# If your data directly comes from collocations between MHS and AVHRR,
# you can use ::meth:`convert_collocated_data` to make it SPARE-ICE
# compatible.
collocations = Collocator().collocate(mhs_data, avhrr_data, ...)
standardized_data = self.standardize_collocations(collocations)
retrieved = spareice.retrieve(standardized_data)
"""
[docs] def __init__(self, file=None, collocator=None, processes=10, verbose=0,
sea_mask_file=None, elevation_file=None):
"""Initialize a SPAREICE object
Args:
file: A JSON file with the coefficients of SPAREICE. If not given,
the standard configuration will be loaded.
collocator: SPARE-ICE requires a collocator when it should be
generated from filesets. You can pass your own
:class:`Collocator` object here if you want.
processes: Number of processes to parallelize the training or
collocation search. 10 is the default. Best value depends on
your machine.
verbose (int): Control ``GridSearchCV`` verbosity. The higher the
value, the more debug messages are printed.
"""
self.verbose = verbose
self.processes = processes
self.name = "SPARE-ICE"
if sea_mask_file is None:
self.sea_mask = None
else:
self.sea_mask = np.flip(
np.array(imageio.imread(sea_mask_file) == 255), axis=0
)
if elevation_file is None:
self.elevation_grid = None
else:
ds = xr.open_dataset(
elevation_file, decode_times=False
)
self.elevation_grid = ds.data.squeeze().values
if collocator is None:
self.collocator = Collocator()
else:
self.collocator = collocator
# SPARE-ICE consists of two retrievals: one neural network for the IWP
# and one decision tree classifier for the ice cloud flag
self._iwp = None
self._ice_cloud = None
# The users can load SPARE-ICE from their own training or the standard
# parameters:
if file is None:
try:
self.load(STANDARD_FILE)
except Exception as e:
warnings.warn(
"Could not load the standard parameters of SPARE-ICE!\n"
"You need to train SPARE-ICE by yourself."
)
warnings.warn(str(e))
self._iwp = RetrievalProduct()
self._ice_cloud = RetrievalProduct()
else:
self.load(file)
def _debug(self, msg):
logger.debug(f"[{self.name}] {msg}")
def _info(self, msg):
logger.info(f"[{self.name}] {msg}")
def _iwp_model(self, processes, cv_folds):
"""Return the default model for the IWP regressor
"""
# Estimators are normally objects that have a fit and predict method
# (e.g. MLPRegressor from sklearn). To make their training easier we
# scale the input data in advance. With Pipeline objects from sklearn
# we can combine such steps easily since they behave like an
# estimator object as well.
estimator = Pipeline([
# SVM or NN work better if we have scaled the data in the first
# place. MinMaxScaler is the simplest one. RobustScaler or
# StandardScaler could be an alternative.
("scaler", RobustScaler(quantile_range=(15, 85))),
# The "real" estimator:
("estimator", MLPRegressor(max_iter=6000, early_stopping=True)),
])
# To optimize the results, we try different hyper parameters by
# using a grid search
hidden_layer_sizes = [
(15, 10, 3),
#(50, 20),
]
hyper_parameter = [
{ # Hyper parameter for lbfgs solver
'estimator__solver': ['lbfgs'],
'estimator__activation': ['tanh'],
'estimator__hidden_layer_sizes': hidden_layer_sizes,
'estimator__random_state': [0, 42, 100, 3452],
'estimator__alpha': [0.1, 0.001, 0.0001],
},
]
return GridSearchCV(
estimator, hyper_parameter, refit=True,
n_jobs=processes, cv=cv_folds, verbose=self.verbose,
)
@staticmethod
def _ice_cloud_model():
"""Return the default model for the ice cloud classifier"""
# As simple as it is. We do not need a grid search trainer for the DTC
# since it has already a good performance.
return DecisionTreeClassifier(
max_depth=12, random_state=5, # n_estimators=20, max_features=9,
)
@property
def inputs(self):
"""Return the input fields of the current configuration"""
return list(set(self.iwp.inputs) | set(self.ice_cloud.inputs))
@property
def iwp(self):
"""Return the IWP regressor of SPARE-ICE"""
return self._iwp
@property
def ice_cloud(self):
"""Return the ice cloud classifier of SPARE-ICE"""
return self._ice_cloud
[docs] def load(self, filename):
"""Load SPARE-ICE from a json file
Args:
filename: Path and name of the file.
Returns:
None
"""
with open(filename, 'r') as infile:
parameters = literal_eval(infile.read())
self._iwp = RetrievalProduct.from_dict(
parameters["iwp"]
)
self._ice_cloud = RetrievalProduct.from_dict(
parameters["ice_cloud"]
)
[docs] def save(self, filename):
"""Save SPARE-ICE to a json file
Notes:
The output format is not standard json!
Args:
filename: Path and name of the file.
Returns:
None
"""
with open(filename, 'w') as outfile:
dictionary = {
"iwp": self.iwp.to_dict(),
"ice_cloud": self.ice_cloud.to_dict(),
}
outfile.write(repr(dictionary))
[docs] def standardize_collocations(self, data, fields=None, add_sea_mask=True,
add_elevation=True):
"""Convert collocation fields to standard SPARE-ICE fields.
Args:
data: A xarray.Dataset object with collocations either amongst
2C-ICE, MHS & AVHRR or MHS & AVHRR.
fields (optional): Fields that will be selected from the
collocations. If None (default), all fields will be selected.
add_sea_mask: Add a flag to the data whether the pixel is over sea
or land.
add_elevation: Add the surface elevation in meters to each pixel.
Returns:
A pandas.DataFrame with all selected fields.
"""
# Check whether the data is coming from a twice-collocated dataset:
if "MHS_2C-ICE/MHS/scnpos" in data.variables:
prefix = "MHS_2C-ICE/"
else:
prefix = ""
# The keys of this dictionary are the new names, while the values are
# old the names coming from the original collocations. If the value is
# a list, the variable is 2-dimensional. The first element is the old
# name, and the rest is the dimnesion that should be selected.
mapping = {
"mhs_channel1": [
f"{prefix}MHS/Data/btemps", f"{prefix}MHS/channel", 0
],
"mhs_channel2": [
f"{prefix}MHS/Data/btemps", f"{prefix}MHS/channel", 1
],
"mhs_channel3": [
f"{prefix}MHS/Data/btemps", f"{prefix}MHS/channel", 2
],
"mhs_channel4": [
f"{prefix}MHS/Data/btemps", f"{prefix}MHS/channel", 3
],
"mhs_channel5": [
f"{prefix}MHS/Data/btemps", f"{prefix}MHS/channel", 4
],
"lat": "lat",
"lon": "lon",
"time": "time",
"mhs_scnpos": f"{prefix}MHS/scnpos",
"solar_azimuth_angle":
f"{prefix}MHS/Geolocation/Solar_azimuth_angle",
"solar_zenith_angle":
f"{prefix}MHS/Geolocation/Solar_zenith_angle",
"satellite_azimuth_angle":
f"{prefix}MHS/Geolocation/Satellite_azimuth_angle",
"satellite_zenith_angle":
f"{prefix}MHS/Geolocation/Satellite_zenith_angle",
"avhrr_channel1": [
"AVHRR/Data/btemps_mean", "AVHRR/channel", 0
],
"avhrr_channel2": [
"AVHRR/Data/btemps_mean", "AVHRR/channel", 1
],
"avhrr_channel3": [
"AVHRR/Data/btemps_mean", "AVHRR/channel", 2
],
"avhrr_channel4": [
"AVHRR/Data/btemps_mean", "AVHRR/channel", 3
],
"avhrr_channel5": [
"AVHRR/Data/btemps_mean", "AVHRR/channel", 4
],
"avhrr_channel1_std": [
"AVHRR/Data/btemps_std", "AVHRR/channel", 0
],
"avhrr_channel2_std": [
"AVHRR/Data/btemps_std", "AVHRR/channel", 1
],
"avhrr_channel3_std": [
"AVHRR/Data/btemps_std", "AVHRR/channel", 2
],
"avhrr_channel4_std": [
"AVHRR/Data/btemps_std", "AVHRR/channel", 3
],
"avhrr_channel5_std": [
"AVHRR/Data/btemps_std", "AVHRR/channel", 4
],
"iwp_number": "MHS_2C-ICE/2C-ICE/ice_water_path_number",
"iwp_std": "MHS_2C-ICE/2C-ICE/ice_water_path_std",
}
# These fields need a special treatment
special_fields = ["avhrr_tir_diff", "mhs_diff", "iwp", "ice_cloud"]
# Default - take all fields:
if fields is None:
fields = list(mapping.keys()) + special_fields
return_data = {}
for field in fields:
if field in special_fields:
# We will do this later:
continue
elif field not in mapping:
# Some fields might be added later (such as elevation, etc)
continue
key = mapping[field]
try:
if isinstance(key, list):
return_data[field] = data[key[0]].isel(
**{key[1]: key[2]}
)
else:
return_data[field] = data[key]
except KeyError:
# Keep things easy. Collocations might contain the target
# dataset or not. We do not want to have a problem just because
# we have not them.
pass
return_data = pd.DataFrame(return_data)
if "avhrr_tir_diff" in fields:
return_data["avhrr_tir_diff"] = \
return_data["avhrr_channel5"] - return_data["avhrr_channel4"]
if "mhs_diff" in fields:
return_data["mhs_diff"] = \
return_data["mhs_channel5"] - return_data["mhs_channel3"]
if "iwp" in fields and "MHS_2C-ICE/2C-ICE/ice_water_path_mean" in data:
# We transform the IWP to log space because it is better for the
# ANN training. Zero values might trigger warnings and
# result in -INF. However, we cannot drop them because the ice
# cloud classifier needs zero values for its training.
with warnings.catch_warnings():
warnings.simplefilter("ignore")
return_data["iwp"] = np.log10(
data["MHS_2C-ICE/2C-ICE/ice_water_path_mean"]
)
return_data["iwp"].replace(
[-np.inf, np.inf], np.nan, inplace=True
)
if "ice_cloud" in fields \
and "MHS_2C-ICE/2C-ICE/ice_water_path_mean" in data:
return_data["ice_cloud"] = \
data["MHS_2C-ICE/2C-ICE/ice_water_path_mean"] > 0
if add_sea_mask:
return_data["sea_mask"] = sea_mask(
return_data.lat, return_data.lon, self.sea_mask
)
if add_elevation:
def get_grid_value(grid, lat, lon):
lat = to_array(lat)
lon = to_array(lon)
if lon.min() < -180 or lon.max() > 180:
raise ValueError("Longitudes out of bounds!")
if lat.min() < -90 or lat.max() > 90:
raise ValueError("Latitudes out of bounds!")
grid_lat_step = 180 / (grid.shape[0] - 1)
grid_lon_step = 360 / (grid.shape[1] - 1)
lat_cell = (90 - lat) / grid_lat_step
lon_cell = lon / grid_lon_step
return grid[lat_cell.astype(int), lon_cell.astype(int)]
return_data["elevation"] = get_grid_value(
self.elevation_grid, return_data.lat, return_data.lon
)
# We do not need the depth of the oceans (this would just
# confuse the ANN):
return_data["elevation"][return_data.elevation < 0] = 0
return return_data
[docs] def retrieve(self, data, as_log10=False):
"""Retrieve SPARE-ICE for the input variables
Args:
data: A pandas.DataFrame object with required input fields (see
above) or a xarray.Dataset if `from_collocations` is True.
as_log10: If true, the retrieved IWP will be returned as logarithm
of base 10.
Returns:
A pandas DataFrame object with the retrieved IWP and ice cloud
flag.
"""
# Retrieve the ice water path:
retrieved = self.iwp.retrieve(data[self.iwp.inputs])
if not as_log10 and retrieved is not None:
retrieved["iwp"] = 10**retrieved["iwp"]
# Retrieve the ice cloud flag:
retrieved = retrieved.join(
self.ice_cloud.retrieve(data[self.ice_cloud.inputs]),
)
return retrieved
@staticmethod
def _retrieve_from_collocations(collocations, _, spareice):
# We need collapsed collocations:
if "Collocations/pairs" in collocations.variables:
collocations = collapse(collocations, reference="MHS")
# However, we do not need the original field names
collocations = spareice.standardize_collocations(collocations)
# Remove NaNs from the data:
collocations = collocations.dropna()
if collocations.empty:
return None
# Retrieve the IWP and the ice cloud flag:
retrieved = spareice.retrieve(collocations).to_xarray()
start = collocations.time.min()
end = collocations.time.max()
spareice._debug(f"Retrieve SPARE-ICE from {start} to {end}")
# Add more information:
retrieved["iwp"].attrs = {
"units": "g/m^2",
"name": "Ice Water Path",
"description": "Ice Water Path (retrieved by SPARE-ICE)."
}
retrieved["ice_cloud"].attrs = {
"units": "boolean",
"name": "Ice Cloud Flag",
"description": "True if pixel contains an ice cloud (retrieved"
" by SPARE-ICE)."
}
retrieved["lat"] = collocations["lat"]
retrieved["lon"] = collocations["lon"]
retrieved["time"] = collocations["time"]
retrieved["scnpos"] = collocations["mhs_scnpos"]
return retrieved
[docs] def retrieve_from_collocations(
self, inputs, output, start=None, end=None, processes=None
):
"""Retrieve SPARE-ICE from collocations between MHS and AVHRR
You can use this either with already collocated MHS and AVHRR data
(pass the :class:`Collocations` object via `inputs`) or you let MHS and
AVHRR be collocated on-the-fly by passing the filesets with the raw
data (pass two filesets as list via `inputs`).
Args:
inputs: Can be :class:`Collocations` or a list with
:class:`~typhon.files.fileset.FileSet` objects. If it is a
:class:`Collocations` object, all files from them are processed
and use as input for SPARE-ICE.
output: Must be a path with placeholders or a :class:`FileSet`
object where the output files should be stored.
start: Start date either as datetime object or as string
("YYYY-MM-DD hh:mm:ss"). Year, month and day are required.
Hours, minutes and seconds are optional. If not given, it is
datetime.min per default.
end: End date. Same format as "start". If not given, it is
datetime.max per default.
processes: Number of processes to parallelize the collocation
search. If not set, the value from the initialization is
taken.
Returns:
None
"""
if processes is None:
processes = self.processes
if "sea_mask" in self.inputs and self.sea_mask is None:
raise ValueError("You have to pass a sea_mask file via init!")
if "elevation" in self.inputs and self.elevation_grid is None:
raise ValueError("You have to pass a elevation file via init!")
timer = Timer.start()
if isinstance(inputs, Collocations):
# Simply apply a map function to all files from these collocations
inputs.map(
SPAREICE._retrieve_from_collocations, kwargs={
"spareice": self,
}, on_content=True, pass_info=True, start=start, end=end,
max_workers=processes, output=output, worker_type="process"
)
elif len(inputs) == 2:
# Collocate MHS and AVHRR on-the-fly:
names = set(fileset.name for fileset in inputs)
if "MHS" not in names or "AVHRR" not in names:
raise ValueError(
"You must name the input filesets MHS and AVHRR! Their "
f"current names are: {names}"
)
iterator = self.collocator.collocate_filesets(
inputs, start=start, end=end, processes=processes,
max_interval="30s", max_distance="7.5 km", output=output,
post_processor=SPAREICE._retrieve_from_collocations,
post_processor_kwargs={
"spareice": self,
},
)
for filename in iterator:
if filename is not None:
self._info(f"Stored SPARE-ICE to\n{filename}")
else:
raise ValueError(
"You need to pass a Collocations object or a list with a MHS "
"and AVHRR fileset!"
)
logger.info(f"Took {timer} hours to retrieve SPARE-ICE")
[docs] def score(self, data):
"""Calculate the score of SPARE-ICE on testing data
Args:
data: A pandas.DataFrame object with the required input fields.
Returns:
The score for the IWP regressor and the score for the ice cloud
classifier.
"""
ice_cloud_score = self.ice_cloud.score(
data[self.ice_cloud.inputs], data[self.ice_cloud.outputs]
)
# We cannot allow NaN or Inf (resulting from transformation to
# log space)
data = data.dropna()
iwp_score = self.iwp.score(
data[self.iwp.inputs], data[self.iwp.outputs]
)
return iwp_score, ice_cloud_score
[docs] def train(self, data, iwp_inputs=None, ice_cloud_inputs=None,
iwp_model=None, ice_cloud_model=None, processes=None,
cv_folds=None):
"""Train SPARE-ICE with data
This trains the IWP regressor and ice cloud classifier.
Args:
data: A pandas.DataFrame object with the required input fields.
iwp_inputs: A list with the input field names for the IWP
regressor. If this is None, the IWP regressor won't be trained.
ice_cloud_inputs: A list with the input field names for the ice
cloud classifier. If this is None, the ice cloud classifier
won't be trained.
iwp_model: Set this to your own sklearn estimator class.
ice_cloud_model: Set this to your own sklearn estimator class.
processes: Number of processes to parallelize the regressor
training. If not set, the value from the initialization is
taken.
cv_folds: Number of folds used for cross-validation. Default is 5.
The higher the number the more data is used for training but
the runtime increases. Good values are between 3 and 10.
Returns:
None
"""
if iwp_inputs is None and ice_cloud_inputs is None:
raise ValueError("Either fields for the IWP regressor or ice "
"cloud classifier must be given!")
if ice_cloud_inputs is not None:
self._info("Train SPARE-ICE - ice cloud classifier")
if ice_cloud_model is None:
ice_cloud_model = self._ice_cloud_model()
score = self.ice_cloud.train(
ice_cloud_model,
data[ice_cloud_inputs], data[["ice_cloud"]],
)
self._info(f"Ice cloud classifier training score: {score:.2f}")
if iwp_inputs is not None:
self._info("Train SPARE-ICE - IWP regressor")
# We cannot allow NaN or Inf (resulting from transformation to
# log space)
data = data.dropna()
if processes is None:
processes = self.processes
if cv_folds is None:
cv_folds = 5
if iwp_model is None:
iwp_model = self._iwp_model(processes, cv_folds)
score = self.iwp.train(
iwp_model, data[iwp_inputs], data[["iwp"]],
)
self._info(f"IWP regressor training score: {score:.2f}")
self._training_report(iwp_model)
@staticmethod
def _training_report(trainer):
if not hasattr(trainer, "cv_results_"):
return
logger.info("Best parameters found on training dataset:\n",
trainer.best_params_)
means = trainer.cv_results_['mean_test_score']
stds = trainer.cv_results_['std_test_score']
for mean, std, params in zip(means, stds, trainer.cv_results_['params']): # noqa
logger.info("%0.3f (+/-%0.03f) for %r" % (mean, std * 2, params))
[docs] def report(self, output_dir, experiment, data):
"""Test the performance of SPARE-ICE and plot it
Args:
output_dir: A path to a directory (does not need to exist). A
subdirectory named `experiment` will be created there. All
plots are stored to it.
experiment: A name for the experiment as a string. Will be included
in the title of the plots and used as name for the subdirectory
in `output_dir`.
data: A pandas.DataFrame object with the required input fields.
Returns:
None
"""
# Create the output directory:
output_dir = join(output_dir, experiment)
os.makedirs(output_dir, exist_ok=True)
# Run SPARE-ICE!
retrieved = self.retrieve(data, as_log10=True)
# We are going to plot the performance of the two retrievals:
self._report_iwp(output_dir, experiment, data, retrieved)
self._report_ice_cloud(output_dir, experiment, data, retrieved)
def _report_iwp(self, output_dir, experiment, test, retrieved):
"""Create and store the plots for IWP regressor"""
# Plot the heatmap with the retrieved IWPs
fig, ax = plt.subplots(figsize=(10, 8))
scat = heatmap(
test.iwp,
retrieved.iwp,
bins=50, range=[[-1, 4], [-1, 4]],
cmap="density", vmin=5,
)
scat.cmap.set_under("w")
ax.set_xlabel("log10 IWP (2C-ICE) [g/m^2]")
ax.set_ylabel("log10 IWP (SPARE-ICE) [g/m^2]")
ax.set_title(experiment)
fig.colorbar(scat, label="Number of points")
fig.savefig(join(output_dir, "2C-ICE-SPAREICE_heatmap.png"))
self._plot_scatter(
experiment,
join(output_dir, "2C-ICE-SPAREICE_scatter_{area}.png"),
test.iwp, retrieved.iwp, test.sea_mask.values
)
# MFE plot with 2C-ICE on x-axis
fe = 100 * (
np.exp(np.abs(
np.log(
10 ** retrieved.iwp.values
/ 10 ** test.iwp.values
)
))
- 1
)
self._plot_error(
experiment, join(output_dir, "2C-ICE-SPAREICE_mfe.png"),
test,
fe, test.sea_mask.values,
)
# Plot the bias:
bias = retrieved.iwp.values - test.iwp.values
self._plot_error(
experiment, join(output_dir, "2C-ICE-SPAREICE_bias.png"),
test,
bias, test.sea_mask.values,
mfe=False, yrange=[-0.35, 0.45],
)
# self._plot_weights(
# experiment, join(output_dir, "SPAREICE_iwp_weights.png"),
# )
with open(join(output_dir, "mfe.txt"), "w") as file:
mfe = sci_binned_statistic(
test.iwp.values,
fe, statistic="median", bins=20,
range=[0, 4],
)
file.write(repr(mfe[0]))
@staticmethod
def _plot_scatter(experiment, file, xdata, ydata, sea_mask):
for area in ["all", "land", "sea"]:
if area == "all":
mask = slice(None, None, None)
elif area == "land":
mask = ~sea_mask
else:
mask = sea_mask
fig, ax = plt.subplots(figsize=(10, 8))
ax.scatter(
xdata[mask], ydata[mask],
s=1, alpha=0.6
)
ax.grid()
ax.set_xlabel("log10 IWP (2C-ICE) [g/m^2]")
ax.set_ylabel("log10 IWP (SPARE-ICE) [g/m^2]")
ax.set_title(f"{experiment} - {area}")
fig.savefig(file.format(area=area))
@staticmethod
def _plot_error(
experiment, file, xdata, error, sea_mask, mfe=True, yrange=None
):
fig, ax = plt.subplots(figsize=(10, 8))
xlabel = "log10 IWP (2C-ICE) [g/m^2]"
xrange = [0, 4]
if mfe:
ax.set_ylabel("Median fractional error [%]")
ax.set_ylim([0, 200])
statistic = "median"
else:
ax.set_ylabel("$\Delta$ IWP (SPARE-ICE - 2C-ICE) [log 10 g/m^2]")
statistic = "mean"
for hemisphere in ["global"]:
for area in ["all", "land", "sea"]:
if area == "all":
mask = np.repeat(True, xdata.iwp.size)
elif area == "land":
mask = ~sea_mask
else:
mask = sea_mask
if hemisphere == "north":
mask &= xdata.lat.values >= 0
elif hemisphere == "south":
mask &= xdata.lat.values < 0
binned_statistic(
xdata.iwp.values[mask],
error[mask], statistic=statistic, bins=20,
range=xrange, pargs={"marker": "o",
"label": f"{area} - {hemisphere}"}
)
ax.set_xlabel(xlabel)
ax.grid()
ax.legend(fancybox=True)
ax.set_title(f"Experiment: {experiment}")
if yrange is not None:
ax.set_ylim(yrange)
fig.tight_layout()
fig.savefig(file)
def _plot_weights(self, title, file, layer_index=0, vmin=-5, vmax=5):
import seaborn as sns
sns.set_context("paper")
layers = self.iwp.estimator.steps[-1][1].coefs_
layer = layers[layer_index]
f, ax = plt.subplots(figsize=(18, 12))
weights = pd.DataFrame(layer)
weights.index = self.iwp.inputs
sns.set(font_scale=1.1)
# Draw a heatmap with the numeric values in each cell
sns.heatmap(
weights, annot=True, fmt=".1f", linewidths=.5, ax=ax,
cmap="difference", center=0, vmin=vmin, vmax=vmax,
# annot_kws={"size":14},
)
ax.tick_params(labelsize=18)
f.tight_layout()
f.savefig(file)
def _report_ice_cloud(self, output_dir, experiment, test, retrieved):
# Confusion matrix:
fig, ax = plt.subplots(figsize=(12, 10))
cm = confusion_matrix(test.ice_cloud, retrieved.ice_cloud)
img = self._plot_matrix(cm, classes=["Yes", "No"], normalize=True)
fig.colorbar(img, label="probability")
ax.set_title("Ice Cloud Classifier - Performance")
ax.set_ylabel('real ice cloud')
ax.set_xlabel('predicted ice cloud')
fig.tight_layout()
fig.savefig(join(output_dir, "ice-cloud-confusion-matrix.png"))
fig, ax = plt.subplots(figsize=(12, 10))
ax.barh(
np.arange(len(self.ice_cloud.inputs)),
self.ice_cloud.estimator.feature_importances_
)
ax.set_yticks(np.arange(len(self.ice_cloud.inputs)))
ax.set_yticklabels(self.ice_cloud.inputs)
ax.set_xlabel("Feature Importance")
ax.set_ylabel("Feature")
ax.set_title("Ice Cloud Classifier - Importance")
fig.savefig(join(output_dir, "ice-cloud-feature-importance.png"))
@staticmethod
def _plot_matrix(
matrix, classes, normalize=False, ax=None, **kwargs
):
"""Plots the confusion matrix of
Normalization can be applied by setting `normalize=True`.
"""
if normalize:
matrix = matrix.astype('float') / matrix.sum(axis=1)[:, np.newaxis]
default_kwargs = {
"cmap": "Blues",
**kwargs
}
if ax is None:
ax = plt.gca()
img = ax.imshow(matrix, interpolation='nearest', **default_kwargs)
tick_marks = np.arange(len(classes))
ax.set_xticks(tick_marks)
ax.set_xticklabels(classes, rotation=45)
ax.set_yticks(tick_marks)
ax.set_yticklabels(classes)
fmt = '.2f' if normalize else 'd'
thresh = matrix.max() / 2.
for i, j in itertools.product(range(matrix.shape[0]),
range(matrix.shape[1])):
ax.text(j, i, format(matrix[i, j], fmt),
horizontalalignment="center",
color="white" if matrix[i, j] > thresh else "black")
return img